Prototyping an I/O pipelining framework

So this is something very very very early, and may yet prove to be a stupid idea but I figured I would write about it as it’s something I’ve been thinking a lot about lately. I’m going to use this opportunity to rubber duck for a bit. to help get my thoughts into more concrete ideas.

This weekend I started prototyping out a small framework with the idea that it should be relatively easy to create a pipeline of systems to transform and perform logic on I/O streams in a composable way that takes advantage of the actor framework to give a good separation of concerns. I call it Prism. It’s born partially out of some of the issues I have come up with in trying to architect a live media streaming server, and looking back at architectural complexities from the basic RTMP servers I have created in the past.

Concepts

There are 3 main concepts that I’m looking at:

  • Handlers - An actor that can receive inputs (from parents) and outputs (from children), perform some type of logic on those inputs and outputs, and pass along it’s own data to children or parents (in the form of new inputs and outputs respectively).
  • Pipelines - A pipeline is a collection of handlers. I have plans for having pipelines registered and to be able to report custom statistics from the handlers it is managing
  • Triggers - Processes that wait for specific conditions to create a new pipeline with specific handlers. For example a trigger might create a pipeline when a TCP connection comes in on port 1234, or I might manually activate a trigger for a pipeline with the purpose of writing compressed and encrypted data to a file.

Simple Example

A simple example is one I used quickly for the initial prototyping was an echo server that reverses whatever text you pass into it. I defined the pipeline for the supervisor as:

children = [
  {Prism.Trigger, [
    Prism.Triggers.RanchTcpAcceptor, [:test, 1111, [active: :once, packet: :line]],
      [
        {Prism.Handlers.ReverseTextHandler, []},
        {Prism.Handlers.EchoHandler, []}
      ]
    ]}

I can envision macros making it a bit more pretty to organize the pipeline definitions.

Essentially this is creating a TCP trigger on port 1111. When a TCP connection comes in the RanchTcpAcceptor creates a new pipeline with a (implied via the acceptor) TcpSocketHandler. That handler will wait until it receives a line of text and pass it to the ReverseTextHandler as input. It will then reverse the string it received and pass it to the EchoHandler as input. The EchoHandler then passes it back to the ReverseTextHandler as output, the ReverseTextHandler passes it back without modification to the TcpSocketHandler as output, which then sends it over the socket.

The interesting thing about this format is you can comment out the ReverseTextHandler and everything still functions (just without string reversal). You can add in something like a ZlibCompressionHandler which decompresses all inputs and compresses outputs, all without modifying the other handlers, or even add an encryption handler in between to handle encryption/decryption of echoed content.

More Complex Scenarios

Let’s say you are making a multi-user, multi-room chat server, but you want several options for how users connect. Let’s say you want users to connect over telnet (with a simpler UX than IRC), or users can connect with any existing IRC client, or users can connect via websockets. Let’s also say that we want to log all user chats to Elastic Search for search later on. The theory is we can define the following pipelines:

[
  {
  	Prism.Trigger, [Prism.Triggers.RanchTcpAcceptor, [:telnet_listener, 1111, [active: :once, packet: :line]],
      [
        {Prism.Handlers.SimpleChatProtocolHandler, []},
        {Prism.Handlers.ChatLogicHandler, []}
      ]
    ]
  },
  {
  	Prism.Trigger, [Prism.Triggers.RanchTcpAcceptor, [:irc_listener, 6667, [active: :once, packet: :line]],
      [
      	{Prism.Handlers.IrcToSimpleProtocol, []}
        {Prism.Handlers.SimpleChatProtocolHandler, []},
        {Prism.Handlers.ChatLogicHandler, []}
      ]
    ]
  },
  {
  	Prism.Trigger, [Prism.Triggers.RanchTcpAcceptor, [:websocket_listener, 8080, [active: :once, packet: :line]],
      [
      	{Prism.Handlers.WebsocketProtocolHandler, []},
        {Prism.Handlers.SimpleChatProtocolHandler, []},
        {Prism.Handlers.ChatLogicHandler, []}
      ]
    ]
  },
  {
  	Prism.Trigger, [Prism.Triggers.OnDemandTrigger, [:logging]],
  	  [
  	  	{Prism.Handlers.ElasticSearchClientHandler, [{127,0,0,1}, 9999]},
  	  	{Prism.Handlers.ChatLoggingHandler, []}
  	  ]
  }
]

The idea is that we code the following handlers to support the system:

  • SimpleChatProtocolHandler - Simple handler that takes input of a line of ascii text and tries to parse out commands from it (what message to send, where to send it, rooms to join, your name, etc…). It would transform the line of text into a more “strongly” typed command for the logic protocol. Likewise it would take any output commands from the ChatLogicHandler and transform it from it’s tuple command form into a line of text to send back to the client.
  • IrcToSimpleProtocol - Handler that takes input of IRC commands and transforms them into line of text that the SimpleChatProtocolHandler can understand, and transforms any received output into a valid IRC command/message.
  • WebsocketProtocolHandler - Handler that takes input from the TCP socket and performs the websocket handshake and validations. Once the handshake is complete it reads unwraps the websocket frames and passes the inner chat protocol lines back to it. Likewise it takes any output and wraps it in a valid websocket frame for the connection.
  • ElasticSearchClientHandler - Handler that maintains a persistent connection to elastic search and takes all output it receives and sends it to elastic search to the index requested.
  • ChatLoggingHandler - Handler that receives chat commands from a ChatLogicHandler and turns them into valid metrics for logging.
  • ChatlogicHandler - Takes the input of valid chat command tuples and performs all logic for the application (nickname registrations, joining/leaving rooms, sending messages, etc…).

When a user connects it would create a pipeline based on which method they connected from (telnet, irc client, or websocket) they have a chat logic handler that ends up registering that user with a central (ETS hosting) process. It would then send an activation signal to the :logging on demand trigger, which will create a logging pipeline for that specific user to so that user’s activity can be aggregated and logged.

When a second user connects and the first user tries to send him a message, the first user’s ChatLoggingHandler would directly send a message sent command to the 2nd user’s ChatLoggingHandler, which will then send the message up the 2nd user’s pipeline as output.

Conclusion

So the whole point is to be able to build a system by creating small composable parts to (hopefully) make it easier to manage a complex networked server.

I’m not 100% sold on this yet, though the prototyping has given me a bit more confidence that there’s a valuable nugget of an idea here, though part of me is wondering if it’s adding more overhead than I’m trying to solve. There is still pretty heavy coupling between the handlers as they all need to know how to handle each other’s inputs and outputs correctly for this to work, so it may not be simple as plugging in some other users handlers with your own handlers.

I need to flesh out a lot of concepts, like how (or should) pipelines communicate/discover each other, the best way to have pipeline aggregated stats (e.g. each handler reporting it’s own stats from a pipeline so you can get holistic metrics on a pipeline), practical methods of activating dynamic on demand triggers, etc…

Ideally too this would make it easy to create network servers that support hot-reloading, and there are a lot of module boundaries that make this difficult.

If this idea does end up working I would probably use this as a framework to try and build a high performance live streaming server which would optimally support 10Gbps data transfer with low latency, and I do wonder if the massive amount of processes works with that goal.

So anyways, if anyone actually got this far I’d love to hear thoughts (even negative ones) about this idea.

2 Likes