How to take a file source and create a HLS stream in Membrane?

Hi, I am having trouble setting up Membrane to take a file source and create a HLS stream. I would be happy for any help! Here is my pipeline module:

defmodule Project.Media.Pipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(video_file) do
    video_file = Path.join([ File.cwd!, video_file])
    children = [
      file_source: %Membrane.File.Source{location: video_file},
      audio_parser: %Membrane.AAC.Parser{out_encapsulation: :none},
      video_parser: %Membrane.H264.FFmpeg.Parser{
        framerate: {25, 1},
        attach_nalus?: true
      },
      audio_payloader: Membrane.MP4.Payloader.AAC,
      video_payloader: Membrane.MP4.Payloader.H264,
      cmaf: %Membrane.MP4.Muxer.CMAF{segment_duration: Membrane.Time.seconds(2)},
      sink: %Membrane.HTTPAdaptiveStream.Sink{
        manifest_module: Membrane.HTTPAdaptiveStream.HLS,
        target_window_duration: 30 |> Membrane.Time.seconds(),
        target_segment_duration: 2 |> Membrane.Time.seconds(),
        persist?: true,
        storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{
          directory: "/tmp/out"
        }
      }
    ]

    links = [
      link(:file_source) |> to(:video_parser) |> to(:video_payloader) |> to(:cmaf),
      link(:file_source) |> to(:audio_parser) |> to(:audio_payloader) |> to(:cmaf),
      link(:cmaf) |> to(:sink)
    ]

    spec = %ParentSpec{
      children: children,
      links: links
    }

    {{:ok, spec: spec}, %{}}
  end
end

Here are membrane related dependencies that I have in mix.exs file

      {:membrane_core, "~> 0.8.0"},
      {:membrane_mp4_plugin, "> 0.0.0"},
      {:membrane_http_adaptive_stream_plugin, "> 0.0.0"},
      {:membrane_file_plugin, "~> 0.8.0"},
      {:membrane_h264_ffmpeg_plugin, "~> 0.16.1"},
      {:membrane_aac_plugin, "~> 0.11.0"}

I used this video file as a example when trying my code: https://www.pexels.com/video/a-red-ferris-wheel-3509314/

Below are the error messages I got:

** (EXIT from #PID<0.500.0>) shell process exited with reason: exited in: GenServer.call(#PID<0.532.0>, {Membrane.Core.Message, :handle_link, [:output, %Membrane.Core.Parent.Link.Endpoint{child: :file_source, pad_props: [], pad_ref: :ou
tput, pad_spec: :output, pid: #PID<0.532.0>}, %Membrane.Core.Parent.Link.Endpoint{child: :audio_parser, pad_props: [], pad_ref: :input, pad_spec: :input, pid: #PID<0.533.0>}, nil], []}, 5000)                                              
   ** (EXIT) time out
[error] GenServer #PID<0.533.0> terminating
** (stop) exited in: GenServer.call(#PID<0.532.0>, {Membrane.Core.Message, :handle_link, [:output, %Membrane.Core.Parent.Link.Endpoint{child: :file_source, pad_props: [], pad_ref: :output, pad_spec: :output, pid: #PID<0.532.0>}, %Membrane.Core.Parent.Link.Endpoint{child: :audio_parser, pad_props: [], pad_ref: :input, pad_spec: :input, pid: #PID<0.533.0>}, %{accepted_caps: :any, availability: :always, demand_unit: :buffers, direction: :input, mode: :pull, name: :input, options: nil}], []}, 5000)
    ** (EXIT) time out
    (elixir 1.12.1) lib/gen_server.ex:1024: GenServer.call/3
    (membrane_core 0.8.2) lib/membrane/core/child/pad_controller.ex:45: Membrane.Core.Child.PadController.handle_link/5
    (membrane_core 0.8.2) lib/membrane/core/element.ex:145: Membrane.Core.Element.handle_call/3
    (stdlib 3.14.2) gen_server.erl:715: :gen_server.try_handle_call/4
    (stdlib 3.14.2) gen_server.erl:744: :gen_server.handle_msg/6
    (stdlib 3.14.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message (from #PID<0.532.0>): {Membrane.Core.Message, :handle_link, [:input, %Membrane.Core.Parent.Link.Endpoint{child: :audio_parser, pad_props: [], pad_ref: :input, pad_spec: :input, pid: #PID<0.533.0>}, %Membrane.Core.Parent.Link.Endpoint{child: :file_source, pad_props: [], pad_ref: :output, pad_spec: :output, pid: #PID<0.532.0>}, nil], []}
State: %Membrane.Core.Element.State{delayed_demands: #MapSet<[]>, internal_state: %{in_encapsulation: :ADTS, leftover: "", out_encapsulation: :none, samples_per_frame: 1024, timestamp: 0}, module: Membrane.AAC.Parser, name: :audio_parser, pads: %{data: %{}, dynamic_currently_linking: [], info: %{input: %{accepted_caps: :any, availability: :always, demand_unit: :buffers, direction: :input, mode: :pull, name: :input, options: nil}, output: %{accepted_caps: Membrane.AAC, availability: :always, direction: :output, mode: :pull, name: :output, options: nil}}}, parent_pid: #PID<0.530.0>, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_state: :stopped}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[]>}, supplying_demand?: false, synchronization: %{clock: nil, latency: 0, parent_clock: #PID<0.531.0>, stream_sync: :membrane_no_sync, timers: %{}}, type: :filter}
Client #PID<0.532.0> is alive

    (elixir 1.12.1) lib/code/identifier.ex:169: Code.Identifier.inspect_as_key/1
    (elixir 1.12.1) lib/inspect.ex:225: Inspect.List.keyword/2
    (elixir 1.12.1) lib/inspect/algebra.ex:408: Inspect.Algebra.container_each/6
    (elixir 1.12.1) lib/inspect/algebra.ex:385: Inspect.Algebra.container_doc/6
    (elixir 1.12.1) lib/inspect.ex:226: Inspect.List.keyword/2
    (elixir 1.12.1) lib/inspect/algebra.ex:408: Inspect.Algebra.container_each/6
    (elixir 1.12.1) lib/inspect/algebra.ex:385: Inspect.Algebra.container_doc/6
    (elixir 1.12.1) lib/inspect/algebra.ex:286: Inspect.Algebra.to_doc/2
    (elixir 1.12.1) lib/kernel.ex:2203: Kernel.inspect/2
    (membrane_core 0.8.2) lib/membrane/core/element/lifecycle_controller.ex:89: Membrane.Core.Element.LifecycleController.handle_shutdown/2
    (membrane_core 0.8.2) lib/membrane/core/element.ex:124: Membrane.Core.Element.terminate/2
    (stdlib 3.14.2) gen_server.erl:727: :gen_server.try_terminate/3
    (stdlib 3.14.2) gen_server.erl:912: :gen_server.terminate/10
    (stdlib 3.14.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
[debug] [:audio_payloader] Terminating element possibly not prepared for termination as it was in state :stopped.
Reason: {:shutdown, :parent_crash}"

[error] GenServer #PID<0.532.0> terminating
** (stop) exited in: GenServer.call(#PID<0.533.0>, {Membrane.Core.Message, :handle_link, [:input, %Membrane.Core.Parent.Link.Endpoint{child: :audio_parser, pad_props: [], pad_ref: :input, pad_spec: :input, pid: #PID<0.533.0>}, %Membrane.Core.Parent.Link.Endpoint{child: :file_source, pad_props: [], pad_ref: :output, pad_spec: :output, pid: #PID<0.532.0>}, nil], []}, 5000)
    ** (EXIT) time out
    (elixir 1.12.1) lib/gen_server.ex:1024: GenServer.call/3
    (membrane_core 0.8.2) lib/membrane/core/child/pad_controller.ex:45: Membrane.Core.Child.PadController.handle_link/5
    (membrane_core 0.8.2) lib/membrane/core/element.ex:145: Membrane.Core.Element.handle_call/3
    (stdlib 3.14.2) gen_server.erl:715: :gen_server.try_handle_call/4
    (stdlib 3.14.2) gen_server.erl:744: :gen_server.handle_msg/6
    (stdlib 3.14.2) proc_lib.erl:226: :proc_lib.init_p_do_apply/3
Last message (from #PID<0.530.0>): {Membrane.Core.Message, :handle_link, [:output, %Membrane.Core.Parent.Link.Endpoint{child: :file_source, pad_props: [], pad_ref: :output, pad_spec: :output, pid: #PID<0.532.0>}, %Membrane.Core.Parent.Link.Endpoint{child: :audio_parser, pad_props: [], pad_ref: :input, pad_spec: :input, pid: #PID<0.533.0>}, nil], []}
State: %Membrane.Core.Element.State{delayed_demands: #MapSet<[]>, internal_state: %{chunk_size: 2048, fd: nil, location: "video.mp4"}, module: Membrane.File.Source, name: :file_source, pads: %{data: %{output: %Membrane.Pad.Data{accepted_caps: :any, availability: :always, caps: nil, demand: 0, demand_unit: nil, direction: :output, end_of_stream?: false, input_buf: nil, mode: :pull, name: :output, options: nil, other_demand_unit: :buffers, other_ref: :input, pid: #PID<0.534.0>, ref: :output, start_of_stream?: false, sticky_messages: nil}}, dynamic_currently_linking: [], info: %{}}, parent_pid: #PID<0.530.0>, playback: %Membrane.Core.Playback{async_state_change: false, pending_state: nil, state: :stopped, target_state: :stopped}, playback_buffer: %Membrane.Core.Element.PlaybackBuffer{q: #Qex<[{Membrane.Core.Message, :demand, 40, [for_pad: :output]}]>}, supplying_demand?: false, synchronization: %{clock: nil, latency: 0, parent_clock: #PID<0.531.0>, stream_sync: :membrane_no_sync, timers: %{}}, type: :source}
Client #PID<0.530.0> is dead

Thank you for your time!

1 Like

Hi! This error is caused by the fact that you are trying to link the output of :file_source twice, which is illegal. It’s a bit unfortunate that this triggers such a nasty error and I have already created an issue about it.

Correct me if I’m wrong, but I think that your input file is an MP4. This would create a bit of a problem because, excluding the double-linking issue, you are feeding the bytestream in MP4 format into the AAC/H264 Parser, which both only accepts bytestreams containing media encoded with AAC/H264.

MP4 is a container containing both + some additional metadata. To extract AAC and H264 bytestreams, you would need to perform Demuxing with an element called Demuxer - which is unfortunately not yet implemented in any of the Membrane plugins.

You could use FFmpeg to perform the demuxing and then feed the resulting .h264 and .aac files into Membrane with two file sources.

3 Likes

Hi, thanks for the response!
I’ve been trying to work get it working using the documentation for some time using only the documentation, but it’s hard to get an overview of what can be done with membrane through the documentation alone. Is there a better way to see what membrane is currently capable of and such?
Have a nice day and thank for your time!

There is a list of Membrane Packages, although I don’t think it is up to date at the moment. Apart from that, if you know what you are looking for you can also just search through membraneframework organisation on github - that’s what I personally do.

Since yesterday I also realised that we don’t have a Demuxer for an ISOM MP4, but we do have one for MPEG-TS. If you were to use it, the pipeline would look something like this:

Mix.install([
  {:membrane_core, "~> 0.8.0"},
  {:membrane_mp4_plugin, "> 0.0.0"},
  {:membrane_http_adaptive_stream_plugin, "> 0.0.0"},
  {:membrane_file_plugin, "~> 0.8.0"},
  {:membrane_h264_ffmpeg_plugin, "~> 0.16.1"},
  {:membrane_aac_plugin, "~> 0.11.0"},
  {:membrane_mpegts_plugin, "~> 0.4.0"}
])

defmodule Pipeline do
  use Membrane.Pipeline

  @impl true
  def handle_init(_opts) do
    video_file = Path.join([File.cwd!, "video.ts"])
    children = [
      file_source: %Membrane.File.Source{location: video_file},
      demuxer: Membrane.MPEG.TS.Demuxer,
      audio_parser: %Membrane.AAC.Parser{out_encapsulation: :none},
      video_parser: %Membrane.H264.FFmpeg.Parser{
        framerate: {25, 1},
        attach_nalus?: true
      },
      audio_payloader: Membrane.MP4.Payloader.AAC,
      video_payloader: Membrane.MP4.Payloader.H264,
      cmaf: %Membrane.MP4.Muxer.CMAF{segment_duration: Membrane.Time.seconds(2)},
      sink: %Membrane.HTTPAdaptiveStream.Sink{
        manifest_module: Membrane.HTTPAdaptiveStream.HLS,
        target_window_duration: 30 |> Membrane.Time.seconds(),
        target_segment_duration: 2 |> Membrane.Time.seconds(),
        persist?: true,
        storage: %Membrane.HTTPAdaptiveStream.Storages.FileStorage{
          directory: "output"
        }
      }
    ]

    links = [
      link(:file_source) |> to(:demuxer),
      # Stream ids came from `ffprobe video.ts` - it is also possible to dynamically link when handling :mpeg_ts_stream_info notification
      # Stream #0:0[0x100]: Video: h264 (High) ([27][0][0][0] / 0x001B), yuv420p(tv, bt709, progressive), 1920x1080, 25 fps, 25 tbr, 90k tbn, 50 tbc
      # Stream #0:1[0x101](und): Audio: aac (LC) ([15][0][0][0] / 0x000F), 48000 Hz, stereo, fltp, 255 kb/s
      link(:demuxer) |> via_out(Pad.ref(:output, 0x100)) |> to(:video_parser) |> to(:video_payloader) |> to(:cmaf),
      link(:demuxer) |> via_out(Pad.ref(:output, 0x101)) |> to(:audio_parser) |> to(:audio_payloader) |> to(:cmaf),
      link(:cmaf) |> to(:sink)
    ]

    spec = %ParentSpec{
      children: children,
      links: links
    }

    {{:ok, spec: spec}, %{}}
  end

  @impl true
  def handle_notification({:mpeg_ts_stream_info, _info}, :demuxer, _ctx, state) do
    {{:ok, forward: {:demuxer, :pads_ready}}, state}
  end

  @impl true
  def handle_notification(_notification, _element, _ctx, state), do: {:ok, state}

  # Detect that processing has finished and terminate the pipeline
  @impl true
  def handle_element_end_of_stream({:sink, _pad}, _ctx, state) do
    __MODULE__.stop_and_terminate(self())
    {:ok, state}
  end

  @impl true
  def handle_element_end_of_stream({_element, _pad}, _ctx, state), do: {:ok, state}
end

{:ok, pid} = Pipeline.start()
monitor = Process.monitor(pid)
:ok = Pipeline.play(pid)

receive do
  {:DOWN, ^monitor, :process, _pid, reason} ->
    :ok
end

So you can transmux your MP4 into MPEG-TS using FFmpeg and then convert it to HLS using Membrane.

$ ffmpeg -i video.mp4 -c copy video.ts