Any good OSS non-trivial TCP servers to use as a reference?

I’m working on an RTMP server in Elixir, and after reading something some people said on a previous post I made here I started wondering if I’m designing this well (especially now that I"m starting to get into some of the more complicated aspects system).

To be clear, this is my first Elixir (and first non-toy BEAM) project, and I usually come from C#.

I could be over-thinking this, but I seem to be stuck on the architecture for handling, processing, and properly reacting to incoming network traffic.

The main issue I seem to be coming to that most examples either assume a text + line based protocol or binary protocols using 4 byte packet length header. Both of those mean that the complicated examples I’ve found use process messages (and active once). Unfortunately, as far as I can tell I can tell this is not available to me, as the headers of each packet (and the amount of data in each packet) has to be manually parsed out and are even conditional.

Does anyone know of any good OSS projects I could use as a reference to get a good handle on how experienced devs handle more complicated TCP scenarios?

1 Like

Have you considered using Ranch? It handles a lot of the basic architecture for you and gives you a good framework for building TCP-based services with. It also seems to have SSL support, so maybe you get free RTMPS support out of the box, if you’re really lucky.

5 Likes

Using Ranch from Elixir is a small guide that further explains how to set up a basic TCP-listener/handler using Elixir with Ranch.

AFAIK there are no RTMP-projects in Elixir to look at, although there are some that have been written in Erlang.

As for doing conditional pattern-matching on binary-strings (such as raw packet data): This is something that Elixir is incredibly good at. Here is a guide about binary pattern-matching and writing a handler for an UDP-packet protocol.

1 Like

(Sorry, this post may be a bit more stream of consciousness than I thought, but if nothing else it’s helping me rubber duck a bit. Hopefully it makes as much sense outside of my head than it does inside)

I am using ranch, but I may be missing how to make my life easier outside of socket pooling and transport wrapping.

The code I have in my protocol handler right now looks like:

defmodule RtmpServer.Handler do
  defmodule State do
    defstruct ip: nil,
              previous_headers: %{},
              connection_details: %RtmpCommon.ConnectionDetails{}
  end
  
  require Logger
  @moduledoc "Handles the rtmp socket connection"
  
  @doc "Starts the handler for an accepted socket"
  def start_link(ref, socket, transport, opts) do
    pid = spawn_link(__MODULE__, :init, [ref, socket, transport, opts])
    {:ok, pid}
  end
  
  def init(ref, socket, transport, _opts) do
    :ok = :ranch.accept_ack(ref)
    
    {:ok, {ip, _port}} = :inet.peername(socket)
    client_ip_string = ip |> Tuple.to_list() |> Enum.join(".")
        
    Logger.info "#{client_ip_string}: client connected"
    
    case RtmpServer.Handshake.process(socket, transport) do
      {:ok, client_epoch} -> 
        Logger.debug "#{client_ip_string}: handshake successful"
        
        state = %State{
          ip: ip, 
          connection_details: %RtmpCommon.ConnectionDetails{peer_epoch: client_epoch}
        }
        
        {:error, reason} = read_next_chunk(socket, transport, state)
        Logger.debug "#{client_ip_string}: connection error: #{reason}"
      
      {:error, reason} -> Logger.info "#{client_ip_string}: handshake failed (#{reason})"
    end
  end
  
  def read_next_chunk(socket, transport, state = %State{}) do    
    case RtmpCommon.Chunking.read_next_chunk(socket, transport, state.previous_headers) do
      {:ok, {updated_headers, header, data}} ->
        process_chunk(socket, transport, %{state | previous_headers: updated_headers}, header, data)
        
      {:error, reason} -> {:error, reason}
    end
  end
  
  defp process_chunk(socket, transport, state, chunk_header, chunk_data) do
    client_ip = state.ip |> Tuple.to_list() |> Enum.join(".")
    
    result = with {:ok, received_message} <- RtmpCommon.Messages.Parser.parse(chunk_header.message_type_id, chunk_data),
                  :ok <- log_received_message(client_ip, received_message),
                  do: RtmpCommon.MessageHandler.handle(received_message, state.connection_details)
                  
    case result do
      {:ok, {new_connection_details, _response}} ->
        __MODULE__.read_next_chunk(socket, transport, %{state | connection_details: new_connection_details})
        
      {:error, {:no_handler_for_message, message_type}} ->
        Logger.debug "#{client_ip}: no handler for message: #{inspect(message_type)}"
        __MODULE__.read_next_chunk(socket, transport, state)
    end
  end
  
  defp log_received_message(client_ip, message) do
    Logger.debug "#{client_ip}: Message received: #{inspect(message)}"
  end
end

That code loop around reading RTMP chunks over and over again, parsing them to get the header and data, parse an RtmpMessage out of the data, then handle that specific message.

The problem I am having, and where I am probably over-thinking it, is in the past 2-3 months of experimenting and playing around with random small projects in Erlang and Elixir it has been ingrained in me the power of reacting to messages.

The problem is the RtmpCommon.Chunking.read_next_chunk() is forced to make many manual calls to transport.recv() due to the RTMP protocol (Have to read the first byte to know how long the next piece of data is, I then parse the next piece of data and compare that to previously received data to know how long the next bit of data is, etc…).

That seems to be preventing me from having a message based process (or even using a gen_server/gen_fsm) which will help simplify the architecture. Being able to react to messages in a timely manner will be extremely useful for rtmp clients (watchers of video), pings, external notifications from other processes, etc…

One of the workarounds I have thought of so far far is to add a receive at the beginning of the read_next_chunk() function, but that means if the TCP client does not send data I will end up delaying receiving and responding to a single BEAM message (and it’s definitely not guaranteed to be constant tcp traffic coming into the server).

Another idea I had is to make this handler module do nothing but receive/parse chunks and then send to a gen_server or gen_fsm to actually process the RTMP chunk. That would allow the second process to react to BEAM messages (including timer messages it sends itself) and have the benefit of it isolating itself out of the chunk reading process. The downside to this is that either the 2nd process has to have the socket and transport so it can format and send responses itself, or it needs to be a synchronous gen_server so it can return the all of the responses the handler should send back to the client (and use some type of queue to queue up pings and other non-responsive messages).

The last idea is to just use {active, once}, {packet, raw} so that TCP messages are received as raw packets, and write the protocol handler module to be able to keep incomplete rtmp packets in state so it can be processed on the next tcp packet received message that I receive. At that point the whole protocol handler can live inside a single gen_server or gen_fsm and be fully reactive.

After typing that out, I guess the last idea is probably going to be the best way to go.

2 Likes

So apologies, I"m going to do a bit more rubber ducking in this thread now that I’ve stepped back from this project for the weekend and gave myself time to reflect. Just tell me to shut-up if it’s annoying bumping my own thread with this :).(though maybe someone else will get some insight from these ramblings)

So the more I think about it, doing active once raw packet retrieval is the way to go so that I can keep my server process reactive to BEAM messages. The trick with that of course is handling incomplete and/or partial RTMP chunks coming down the pipe, since it’s not guaranteed that 1 TCP packet (or whatever gen_tcp considers a packet in actuality) contains 1 complete packet, and I need to be able to handle situations where 1 packet contains < 1 RTMP chunk or 1 packet contains multiple chunks (with an incomplete chunk at the end).

There are two ways I see to handle this.

The first is to have the gen_server protocol handle keep state that includes the previous chunks successfully received (since I need them in order to parse out future chunks), the RTMP chunk I am in the process of parsing (but was incomplete as of the last tcp packet received), an atom representing what stage of chunk parsing we stopped at (due to running out of bits in the last packet), and the remaining unparsed binary from the last packet. Each time a new TCP message is received, call a parsing module that takes the current state and the newly received binary and returns an updated state + any completely parsed chunks it was able to grab out of the packet. If there were any complete

What’s really cool about this idea is that Elixir’s functional pattern matching makes this so easy to work with. If there are 5 stages of parsing I can just have 5 versions of the parse function, one for each stage, each stage calls the next stage until we run out of binary and returns the results. This means that handling partial chunks may actually be much easier than I had initially feared. It also makes it relatively easy to test, as I can just say “for this state and this binary, these completed chunks and this state should be returned”.

The other idea is to use a gen_fsm to parse the incomplete chunks. In a previous thread someone mentioned the gen_fsm was made for dealing with network protocols and I was having trouble understanding how, but this partial chunk handling situation seems to lend to it perfectly.

Every time a TCP packet is received (and the gen_server protocol handler receives the packet) it forwards the packet to the fsm. The FSM has states based on what stage of the current RTMP chunk it is currently parsing and keeps it’s own private state of previous received chunk headers (for future chunk parsing purposes) as well as unread binary it previously received. Every time it parses a full chunk it will send that chunk back to the protocol handler as a :chunk_read message. The protocol handler will then have a handle_cast({:chunk_read, chunk}) function which will handle the actual processing of that chunk.

The latter is actually extremely compelling to me, because it allows me to very very easily handle testing both the chunk reading functionality (I don’t have to stand up state for each call, I can just send it multiple events with binary and verify it sends a message back with the chunks), but since the protocol handler is designed to receive messages that a full chunk has been read (and here’s the chunk itself) I can easily test the protocol handler itself on how it handles full RTMP chunks without having to create a mock ranch transport.

Of course, the downside to the latter method is that it has extra overhead of sending the data to the fsm process then back to the protocol handler process, and I have to make sure the FSM is quick enough that lots of RTMP traffic on that connection doesn’t overload the mailbox.

What you might want to consider as an alternative to gen_fsm is Fsm. This lets you easliy define a pure functional state machine that is not separated in its own process. This might be great, if you want to switch parse states inside the protocol handler process. (instead of passing messages back-and-forth).

1 Like

That’s actually perfect, thank you very much for posting that.

That also made me realize I"m thinking of state as a per process thing for some reason, when really there’s no reason that the process can’t just be a holder for state that another module manages (such as that FSM example).

I think I"m going to proceed by doing an in process FSM (not 100% sure if I’ll go that library or not yet) but also initialize the FSM with an MFA callback back to the protocol handler to handle any completed rtmp chunks. That will allow me to fully test both in isolation with no need for a mock transport at all, with the added benefit of it running fully in memory (I’m not worried about processing a chunk blocking reading more chunks).

2 Likes

hello, @KallDrexx,
I tried to implement authentication on :ranch_tcp but without using ssl.
I’m interested how finction RtmpServer.Handshake.process(socket, transport) is implemented

This might be a bit late but I gave a talk on building a server in elixir http://crowdhailer.me/talks/2017-03-29/building-a-webserver-in-elixir/

Ace, the server i made, is a TCP server and the HTTP layer is an extension on top of that. It might be useful

Old post, but this should be stated:

This is where the parsing type of a packet is useful, like if all of your packets are pre-pended with an N-byte size then it can send you only those precisely and perfectly. Erlang has a few built in. :slight_smile:

It can be interesting to use two processes as well. See: http://erlang.org/pipermail/erlang-questions/2003-February/007751.html

Do you have a direct link? You might be looking at an old version, as the current handshake code is at elixir-media-libs/apps/rtmp/lib/rtmp/handshake.ex at master · KallDrexx/elixir-media-libs · GitHub and doesn’t have that function. I may have forgotten to update the docs if you are referencing hex.pm.

Unfortunately, I don’t control the protocol (it’s pre-specced out by third parties) and thus I can’t rely on something like this. It’s much nicer when you have full control of both sides of the communication :slight_smile:

I figured, but it is still something good to say. :slight_smile:

Thanks