Video Streaming with Elixir (wiki)

I tried to use the Elixir FFMPEG library, but it seems that it doesn’t support pipes at the moment.

While it still in development, I came up with the following script to write incoming live HLS stream into disk.

#!/bin/sh

DIR=`dirname $BASH_SOURCE[0]`
TMP_DIR="${DIR}/../tmp/pool"
OUTPUT_FILE="${TMP_DIR}/live_%00d.ts"
OUTPUT_PLAYLIST="${TMP_DIR}/live.m3u8"

ffmpeg \
  -i pipe:0 \
  -c:a copy \
  -c:v libx264 \
  -bufsize 1024k \
  -strict experimental \
  -map 0 \
  -s 320x240 \
  -f ssegment \
  -segment_list $OUTPUT_PLAYLIST -segment_time 10 \
  -flags +global_header \
  -segment_list_size 10 -segment_list_flags +live \
  -segment_list_type hls \
  $OUTPUT_FILE

Receiving video stream

In order to receive videos from a source, we can listen to UDP packets using a GenServer

defmodule Streaming.Incoming do
  use GenServer

  def start_link(opts \\ []) do
    GenServer.start_link(__MODULE__, :ok, opts)
  end

  def init(:ok) do
    incoming_port = Application.get_env(:streaming, :incoming_port)

    {:ok, _socket} = :gen_udp.open(incoming_port, [:binary,
      {:active, true}, {:buffer, 1024}
      ])
  end

  def handle_info({:udp, _socket, _ip, _port, data}, state) do
    IO.inspect "---> Received #{byte_size(data)} bytes from #{_port}"

    # Write to a bucket
    Streaming.Bucket.add data
    
    # Or write to file
    Streaming.Encoder.write data

    {:noreply, state}
  end

  def handle_info({_, _socket}, state) do
    {:noreply, state}
  end

end

The Bucket module is the same as the key value store module on Elixir website. The Encoder is a module that writes to the FFMPEG script above. It is the part where the data should be written into multiple process for multiple resolutions.

This incoming module will store any stream into the same file with the same configuration, so you will need to change that too.

Start the process when the app or the stream starts

def init(_state) do
    # Open a port for the external process
    #   wait for stdin.
    port = Port.open({:spawn, "bin/read_hls"}, [:binary])

    {:ok, port}
  end

Then write to it from Streaming.Incoming

def encode(data) do
    GenServer.cast(:encoder, {:encode, data})
  end

def handle_cast({:encode, data}, port) do
    port |> Port.command(data)

    {:noreply, port}
  end

Streaming the live data
You can write the response from the FFMPEG process, back to the key value storage and stream it to client. Keep in mind that unlike streaming files, it will take a lot of memory. It is also FILO, so you will need to reverse the list.

Another option which I use in the example, is to stream the files that FFMPEG created. When a client request the live stream, serve the playlist.m3u8 file and it will contain a list of all the *.ts video files.

Testing the stream

Run this command in order to stream to the server.

    ffmpeg -y \
      -i $MEDIA_FILE \
      -c:a aac -ac 2 -ar 44100 \
      -c:v libx264 \
      -movflags frag_keyframe+empty_moov \
      -strict experimental \
      -crf 18 \
      -b:a 64k -b:v 64k -bufsize 64k \
      -pix_fmt yuv420p -profile:v baseline -level 1.3 -maxrate 192K -bufsize 1024k \
      -f mpegts -hls_time 9 -hls_list_size 0 \
      -s 1280x720 \
      -threads 12 \
      udp://127.0.0.1:3001

You should see the playlist under the ./tmp folder.

3 Likes