Membrane RTSP source

I’m building a project for streaming audio/video from a network webcam. I’ve successfully managed to get h264 stream data after modifying the built in Transport. I believe my next goal is to set it up to be a a Membrane Source, but i can’t find any existing documentation in the hexdocs for this.

Has anyone built their own membrane Source?
Am i on the right track or way off?

10 Likes

Hi @ConnorRigby, you’re totally right - source is the way to pass the stream into a Membrane pipeline. The docs you’re looking for are here: Membrane.Source. As described there, sources should implement Membrane.Element.Base and Membrane.Element.WithOutputPads behaviours. Various source implementations are available at Membrane GitHub, for example file source or portaudio source.

6 Likes

wow thanks. I’m not sure how i managed to miss those docs. Looks like exactly what i needed.

@mat-hek I’ve successfully wired up my first pipeline. It seems like it almost works, but i’ve ran into an issue. My stream only supports interleaving, however the membrane-element-rtp-h264 element does not support interleaving.

There is only one channel, so i tried to hack it together but just filtering out the interleaving packets with something along the lines of this:

  defp process(state, buffer)

  defp process(%{length: l} = state, buffer) when is_number(l) do
    buffer = state.buffer <> buffer
    if byte_size(buffer) >= l do

      <<chunk::binary-size(l), rest::binary>> = buffer
      actions = [buffer: {:output, %Buffer{payload: chunk}}, redemand: :output]
      {{:ok, actions}, %{length: nil, buffer: rest}}

    else

      {{:ok, demand: {:input, state.length}}, %{state | buffer: buffer}}

    end
  end

  defp process(state, <<36, channel::integer-8, length::integer-16, chunk::binary>>) do
    actions = [buffer: {:output, %Buffer{payload: chunk}}, redemand: :output]
    {{:ok, actions}, %{length: nil, buffer: rest}}
    process(%{state | length: length}, rest)
  end

but that didn’t seem to work. In the logs i can see:

[h264 @ 0x7fbee401ed00] non-existing PPS 0 referenced
[h264 @ 0x7fbee401ed00] decode_slice_header error
h264 @ 0x7fbee401ed00] no frame!

and then i get an error:

13:14:04.542 [error] GenServer #PID<0.281.0> terminating
** (Membrane.ActionError) Error while handling :split action:
Unknown error: :send_pkt
Callback: Membrane.Element.FFmpeg.H264.Decoder.handle_process_list
Action args: {:handle_process,
 [
   [
     :input,
     %Membrane.Buffer{
       metadata: %{},
       payload: <<0, 0, 0, 1, 101, 136, 128, 16, 0, 12, 255, 245, 154, 34, 103,
         162, 245, 12, 56, 225, 60, 222, 189, 150, 153, 78, 16, 77, 254, 201,
         165, 53, 240, 253, 3, 133, 170, 112, 0, 254, 178, 211, 19, ...>>
     }
   ]
 ]}

I think my issue is my deinterleaver is actually only outputting RTP frames, and the h264 frames are being dropped? Is there plans to adding “official” support for interleaving? I’d be happy to help contribute it, but i’m having a hard time determining where to add it. wireshark calls it an rtsp interleave frame so i assumed it could be added to the rtsp source, but for the deinterleaver to work properly, it needs to decode the RTP and maybe even the h264 frames. Any help would be appreciated.

What kind of interleaving do you mean? What exactly is interleaved? As far as I understand, h264 RTP interleaving mode works a bit differently than you assume: it’s not about sending multiple h264 channels in one stream, but sending NALUs in an order different than the one they are encoded. For example, if your stream is
A B C D E F G H
it can be sent as
A D G B E H C F

After receiving, the original order needs to be restored. This way aims to avoid loosing too many subsequent frames at once. It is not really widely supported feature, so it’s quite strange that your camera supports only that. This is described in RFC 6184.

To support this, you would need to adjust the depayloader. The depayloader gets RTP packets payload (not entire RTP packets - these are parsed by the RTP parser, one ‘step’ before) and outputs h264 stream. The stream can be payloaded in different modes. Currently, the two usually used ones are supported: Fu-a and Stap-a, however, it’s possible that your camera uses another. Depending on the mode, there are some ways to get Decoding Order Number, thanks to which you can order the NALUs properly. For sure you shouldn’t decode h264 there.

The error you ended up with basically means the decoder failed because of invalid input. The logs indicate that a PPS NALU is either missing or at an improper position in the stream. Make sure to enable membrane logger to have all the logs present/

1 Like

Thanks for the reply.
I’ve started reading the RFC for deinterleaving. You are correct in that i will not need to decode h264. The reason the camera requires interleaving is because it streams audio and video on the same RTSP session. The DESCRIBE method lists two available channels. I had assumed the packets were in order because WireShark automatically reorders them based on the interleaving data. In reality, they are coming out of order. It looks like i will need to make my own depayloader to output my mpeg-4 audio data along with h264 data. After reading the spec, it doesn’t look that hard to implement deinterleaving.

Oh, that’s another interleaving :stuck_out_tongue: I haven’t noticed you mentioned audio. In case of RTSP+RTP, there are two ways of having two (e.g. audio and video) streams in one RTSP session:

  • having two RTP sessions
  • having them interleaved in one RTP session

To my knowledge, in either option, both streams are payloaded and depayloaded separately - the RTP parser distinguishes the stream by ssrc and passes to the proper depayloader. That case is fully supported and even implemented as our RTP demo :wink:

oh interesting. Thanks for the heads up. Maybe i can get rid of my filter and just try using the rtp bin module. I haven’t given it a try yet as it looked like the bin feature was still a little new. Will report back

update: i tried this out:

  def handle_init(_) do
    spec = %ParentSpec{
      children: [
        rtsp: %RTSP{
          location: "rtsp://localhost:554/axis-media/media.amp/"
        },
        rtp: %RTP.Receiver{fmt_mapping: %{96 => "H264", 97 => "MPA"}}
      ],
      links: [
        link(:rtsp) 
        |> to(:rtp)
      ]
    }

    {{:ok, spec: spec}, %{mpa: nil, h264: nil}}
  end

and it failed with:

09:34:15.033 [error] GenServer #PID<0.287.0> terminating
** (Membrane.ActionError) Error while handling :split action:
Unknown error: :wrong_version
Callback: Membrane.Element.RTP.Parser.handle_process_list
Action args: {:handle_process,
 [
   [
     :input,
     %Membrane.Buffer{
       metadata: %{},
       payload: <<36, 0, 5, 120, 128, 96, 54, 103, 86, 115, 59, 168, 242, 53,
         181, 184, 124, 133, 136, 128, 24, 0, 12, 255, 245, 154, 34, 103, 162,
         245, 12, 56, 225, 60, 222, 189, 150, 153, 78, 16, 77, 254, 201, ...>>
     }
   ]
 ]}

in that payload the <<36, 0, 5, 120>> is the RTP interleave packet: <<36, channel_id::integer-8, rtp_packet_length::integer-16>> so it doesn’t look like it’s currently supported, unless i’m missing an option somewhere.

Ok, that’s an interleaving I haven’t heard about yet - not only audio and video are interleaved, but also RTSP packets. I’ve done short research and it seems I finally understand your first post :stuck_out_tongue: So basing on RFC 7826 it looks like we have the following types of packets:

  • RTP packets (prepended with this four-byte header)
  • RTSP packets (plain text)
  • possibly RTCP packets (prepended by the same header as RTP, but with the channel id incremented by one, as far as I understood)

So I think the parsing should be something like:

def dissect(<<36, rtp_channel::integer-8, length::integer-16, chunk::binary-size(length), rest::binary>>, rtp_channel) do
  # chunk is an RTP packet, forward to RTP parser
end

def dissect(<<36, rtcp_channel::integer-8, length::integer-16, chunk::binary-size(length), rest::binary>>, rtp_channel) when rtcp_channel == rtp_channel+1 do
  # chunk is an RTCP packet - not yet supported in Membrane - ignore or extract some information if needed
end

def dissect(<<36, _::binary>>, _rtp_channel) do
  # request more data
end

def dissect(rtsp_packet, _rtp_channel) do
  # got RTSP packet - even if not needed, has to be parsed to identify the beginning of the next packet,
  # that may be RTP. https://github.com/membraneframework/membrane-protocol-rtsp/blob/master/lib/rtsp/response.ex
  # may be helpful. You can also try skipping until the next 36 byte, but not sure it will work.
end

This should be done before the RTP parser (or before the RTP receiver bin). I don’t know RTSP that thoroughly, but hopefully this will finally be helpful.

1 Like

Thanks for the input. Should i implement this as a Membrane Filter or something else? I tried a filter before, but i second guessed myself.

As a filter, I think. So that you can connect a TCP source to the input and RTP bin to the output.

1 Like

This looks pretty awesome, excited to see the result @ConnorRigby

2 Likes

I tried adding a single filter between the TCP socket and the RTP bin:

defmodule Disect do
  use Membrane.Filter
  alias Membrane.Buffer

  def_input_pad :input, caps: :any, demand_unit: :buffers
  def_output_pad :output, caps: :any

  @impl true
  def handle_init(state) do
    state = Map.merge(state || %{}, %{
      buffer: <<>>
    })
    {:ok, state}
  end

  @impl true
  def handle_demand(:output, size, :buffers, _ctx, state) do
    {{:ok, demand: {:input, size}}, state}
  end

  @impl true
  def handle_process(:input, %{payload: payload}, _ctx, state) do
    disect(state.buffer <> payload, [], state)
  end

  def disect(<<36, 0::integer-8, length::integer-16, chunk::binary-size(length), rest::binary>>, acc, state) do
    # chunk is an RTP packet, forward to RTP parser
    disect(rest, acc ++ [%Buffer{payload: chunk}], state)
  end

  def disect(<<36, channel::integer-8, length::integer-16, _chunk::binary-size(length), rest::binary>>, acc, state) do
    IO.puts "ignoring channel: #{channel}"
    disect(rest, acc, state)
  end
  
  # ignoring non video packets for simplicity
  def disect(<<36, _::binary>> = incomplete, acc, state) do
    # need to request more data
    {{:ok, [buffer: {:output, acc}, redemand: :output]}, %{state | buffer: incomplete}}
  end

  def disect(<<>>, acc, state) do
    {{:ok, [buffer: {:output, acc}, redemand: :output]}, %{state | buffer: <<>>}}
  end
end

but i’m still getting a similar error to what i received before:

09:00:54.691 [error] GenServer #PID<0.403.0> terminating
** (Membrane.ActionError) Error while handling :split action:
Unknown error: :send_pkt
Callback: Membrane.Element.FFmpeg.H264.Decoder.handle_process_list
Action args: {:handle_process,
 [
   [
     :input,
     %Membrane.Buffer{
       metadata: %{},
       payload: <<0, 0, 0, 1, 101, 136, 128, 16, 0, 12, 255, 245, 154, 34, 103,
         162, 245, 12, 56, 225, 60, 222, 189, 150, 153, 78, 16, 77, 254, 201,
         165, 53, 240, 253, 3, 133, 170, 112, 0, 254, 178, 211, 19, ...>>
     }
   ]
 ]}

The issue i have here is that in my wireshark dump, those bytes don’t exist in any of the packets. I have wireshark pcap if you are interested. Any ideas?

UPDATE:
i found this line that adds the first 4 bytes onto the head of that paylod, but i still can’t find even 101, 136, 128, 16 or any other portion of this paylod in the pcap.

The payload <<0, 0, 0, 1, 101, 136, ... is H264 frame that has been pieced together from RTP packets, the RFC for H264 over RTP explains the process, basically you have to take apart the first and second byte to get the fragment type and NAL type then put them together, so you will not find the first couple bytes from RTSP dump.

It’s been some time since I wrote RTSP client, but I think the disect method needs to double check it has actually got length number of bytes. And since you are dealing with interleaved stream, you should not ignore non video packets like that, I think what happens is erlang tcp buffer size is usually bigger than 1 RTSP packet, so you will get multiple RTSP packets every time, which means you need to code the logic to split up RTSP packets.

Thanks for the info. My dissect function is ensuring that the packets lengths are correct as far as i can tell. Everything from {:tcp, data} is buffered and is able to be parsed successfully as RTP packets i think. The recursive function takes care of this: (i’ve added more comments)

  # complete packet verified length
  def disect(<<36, 0::integer-8, length::integer-16, chunk::binary-size(length), rest::binary>>, acc, state) do
    # chunk is an RTP packet, forward to RTP parser
    # rest should be the beginning of the next packet, it may not be complete
    disect(rest, acc ++ [%Buffer{payload: chunk}], state)
  end

  # this has never actually happened in my tests because my camera is currently configured to only output
  # one channel currently. 
  def disect(<<36, channel::integer-8, length::integer-16, _chunk::binary-size(length), rest::binary>>, acc, state) do
    # _chunk is a RTP packet, but we don't need it
    IO.puts "ignoring channel: #{channel}"
    disect(rest, acc, state)
  end

  # this packet is not complete. Stop recursion, dispatch complete packets, buffer the incomplete data
  def disect(<<36, _::binary>> = incomplete, acc, state) do
    # need to request more data
    {{:ok, [buffer: {:output, acc}, redemand: :output]}, %{state | buffer: incomplete}}
  end

If you buffer the incomplete packet… the next chunk of tcp bytes is not going to start with the RTSP magic code 36, and how would you know how many bytes left to read for this incomplete packet?

Edit: thanks for the added comment I see now your code should work.

It seems now the problem is decoding h264? I suggest you print the size of your frames, H264 should have a few small frames (SPS, PPS, etc) at the beginning before the IDR frame, followed by many smaller P frames. If you don’t have the SPS/PPS frames, that’s what the “non-existing PPS 0 referenced” error is about.

I think i must be missing something, so i made a really dumbed down receiver for RTP data. I tested this out just now with wireshark and every RTP frame that wireshark sees, this code sees. I compared the RTP packet sequence_number and they are in order and complete.

# start here
def run() do
   # imagine rtsp SETUP and PLAY request here
   socket = do_rtsp_setup_stuff()
   # enter the buffering loop
   loop(socket, <<>>)
end


def loop(socket, old_buffer_from_last_disect) do
   # get new unparsed data from the socket 
   data = recv(socket)
   # disect the data starting with the unparsed data from last loop
   {complete_packets, incomplete_packets} = disect(old_buffer_from_last_disect <> data, [])
   loop(socket, unparsed_data)
end

# complete packet verified length
  def disect(<<36, 0::integer-8, length::integer-16, chunk::binary-size(length), rest::binary>>, complete_packets) do
    # chunk is an RTP packet, forward to RTP parser
    # rest should be the beginning of the next packet, it may not be complete
    packet = RTP.decode!(chunk)
    disect(rest, complete_packets ++ [packet])
  end

  # this has never actually happened in my tests because my camera is currently configured to only output
  # one channel currently. 
  def disect(<<36, channel::integer-8, length::integer-16, _chunk::binary-size(length), rest::binary>>, complete_packets) do
    # _chunk is a RTP packet, but we don't need it
    IO.puts "ignoring channel: #{channel}"
    disect(rest, complete_packets)
  end

  # this packet is not complete. Stop recursion, dispatch complete packets, buffer the incomplete data
  def disect(<<36, _::binary>> = incomplete_packets, complete_packets) do
    # need to request more data
    {complete_packets, incomplete_packets}
  end

I have a vague feeling your issue is Membrane is not prepending SPS and PPS in front of IDR frame. It is a weird thing with ffmpeg, I have had many headaches with this before. Just a thought, I’m probably wrong as I have basically no working knowledge of Membrane.

1 Like

@xlphs that’s true, Membrane won’t add any SPS or PPS. However, they should already be present in the raw (containerless) h264 afaik. Maybe there’s a possibility to make the camera inject them?