Resample 8000Hz s16le for use in Bumblebee whisper model (16000Hz 32f)

Hi folks, what’s the easiest way to use Membrane to resample from PCM single channel 8000Hz sample rate s16le to something I can use in the Whisper bumblebee model? (I believe 16000Hz 32f?)

For simplicity, let’s say I’d like to do this in a LiveBook and I’ve got a Binary of the raw 16bit audio frames loaded in memory

1 Like

you should use our FFmpeg SWResample plugin. The docs contain an example of a thing similar to what you want to do, except for the fact that the input is a file instead of a in-memory binary. Let me know if you have any trouble!

2 Likes

Ah, thanks so much for the quick response. I guess I was wondering if there was a way to just reasonless from binary to binary without having to set up a pipeline etc

Will dig into how pipelines work :+1:

Unfortunately it might not work currently. I checked our github and apparently there is some kind of a bug in our swresample plugin : (

Thanks so much for your help, I sort-of got it working in a LiveBook notebook now using this code

Mix.install([{:membrane_ffmpeg_swresample_plugin, "~> 0.16.1"}])

defmodule Resampling.Pipeline do
  use Membrane.Pipeline

  alias Membrane.FFmpeg.SWResample.Converter
  alias Membrane.{File, RawAudio}

  @impl true
  def handle_init(_ctx, _opts) do
    structure = [
      child(:file_src, %File.Source{location: "/tmp/elixir_converted_from_twilio.pcm"})
      |> child(:converter, %Converter{
        input_stream_format: %RawAudio{channels: 1, sample_format: :s16le, sample_rate: 8_000},
        output_stream_format: %RawAudio{channels: 1, sample_format: :f32le, sample_rate: 16_000}
      })
      |> child(:file_sink, %File.Sink{location: "/tmp/elixir_converted_from_twilio_16khz_f32.pcm"})
    ]

    {[spec: structure, playback: :playing], nil}
  end

  @impl true
  def handle_element_end_of_stream(:file_sink, _pad, _ctx_, _state) do
    {[playback: :stopped], nil}
  end
end

{:ok, pid, pid2}  =  Resampling.Pipeline.start_link()

It creates the output file with the correct format and I can use it as input for bumblebee. However, I still have a couple of questions

  1. Is there an equivalent to the file plugin that lets me input and output a binary? I’m sorry if I missed this in the membrane docs somewhere

  2. This minimal notebook above somehow ends up with each cell in “aborted” state, even though the output file is successfully created and the output of the cell looks good:

21:38:46.614 [debug] Pipeline start link: module: Resampling.Pipeline,
pipeline options: nil,
process options: []


21:38:46.630 [debug] <0.282.0>/ New spec #Reference<0.3194694823.2069889028.48896>
structure: [%Membrane.ChildrenSpec.StructureBuilder{children: [{:file_sink, %Membrane.File.Sink{location: "/tmp/elixir_converted_from_twilio_16khz_f32.pcm"}, %{get_if_exists: false}}, {:converter, %Membrane.FFmpeg.SWResample.Converter{input_stream_format: %Membrane.RawAudio{channels: 1, sample_rate: 8000, sample_format: :s16le}, output_stream_format: %Membrane.RawAudio{channels: 1, sample_rate: 16000, sample_format: :f32le}}, %{get_if_exists: false}}, {:file_src, %Membrane.File.Source{location: "/tmp/elixir_converted_from_twilio.pcm", chunk_size: 2048}, %{get_if_exists: false}}], links: [%{from: :converter, from_pad: :output, from_pad_props: %{options: []}, to: :file_sink, to_pad: :input, to_pad_props: %{auto_demand_size: nil, min_demand_factor: nil, options: [], target_queue_size: nil, throttling_factor: 1, toilet_capacity: nil}}, %{from: :file_src, from_pad: :output, from_pad_props: %{options: []}, to: :converter, to_pad: :input, to_pad_props: %{auto_demand_size: nil, min_demand_factor: nil, options: [], target_queue_size: nil, throttling_factor: 1, toilet_capacity: nil}}], status: :done, from_pad: :output, from_pad_props: %{options: []}, to_pad: :input, to_pad_props: %{auto_demand_size: nil, min_demand_factor: nil, options: [], target_queue_size: nil, throttling_factor: 1, toilet_capacity: nil}, link_starting_child: :file_sink}]


21:38:46.632 [debug] <0.282.0>/ Starting children: [%Membrane.ChildEntry{name: :file_src, module: Membrane.File.Source, options: %Membrane.File.Source{location: "/tmp/elixir_converted_from_twilio.pcm", chunk_size: 2048}, component_type: :element, pid: nil, clock: nil, sync: nil, spec_ref: #Reference<0.3194694823.2069889028.48896>, initialized?: false, ready?: false, terminating?: false}, %Membrane.ChildEntry{name: :converter, module: Membrane.FFmpeg.SWResample.Converter, options: %Membrane.FFmpeg.SWResample.Converter{input_stream_format: %Membrane.RawAudio{channels: 1, sample_rate: 8000, sample_format: :s16le}, output_stream_format: %Membrane.RawAudio{channels: 1, sample_rate: 16000, sample_format: :f32le}}, component_type: :element, pid: nil, clock: nil, sync: nil, spec_ref: #Reference<0.3194694823.2069889028.48896>, initialized?: false, ready?: false, terminating?: false}, %Membrane.ChildEntry{name: :file_sink, module: Membrane.File.Sink, options: %Membrane.File.Sink{location: "/tmp/elixir_converted_from_twilio_16khz_f32.pcm"}, component_type: :element, pid: nil, clock: nil, sync: nil, spec_ref: #Reference<0.3194694823.2069889028.48896>, initialized?: false, ready?: false, terminating?: false}] 

21:38:46.632 [debug] <0.282.0>/ Starting child: name: :file_src, module: Membrane.File.Source

21:38:46.634 [debug] <0.282.0>/ subprocess supervisor Element start: :file_src
node: ,
module: Membrane.File.Source,
element options: %Membrane.File.Source{location: "/tmp/elixir_converted_from_twilio.pcm", chunk_size: 2048},
method: start


21:38:46.635 [debug] <0.282.0>/:file_src/ Initializing element: Membrane.File.Source, options: %Membrane.File.Source{location: "/tmp/elixir_converted_from_twilio.pcm", chunk_size: 2048}

21:38:46.637 [debug] <0.282.0>/ Starting child: name: :converter, module: Membrane.FFmpeg.SWResample.Converter

21:38:46.637 [debug] <0.282.0>/ subprocess supervisor Element start: :converter
node: ,
module: Membrane.FFmpeg.SWResample.Converter,
element options: %Membrane.FFmpeg.SWResample.Converter{input_stream_format: %Membrane.RawAudio{channels: 1, sample_rate: 8000, sample_format: :s16le}, output_stream_format: %Membrane.RawAudio{channels: 1, sample_rate: 16000, sample_format: :f32le}},
method: start


21:38:46.637 [debug] <0.282.0>/:converter/ Initializing element: Membrane.FFmpeg.SWResample.Converter, options: %Membrane.FFmpeg.SWResample.Converter{input_stream_format: %Membrane.RawAudio{channels: 1, sample_rate: 8000, sample_format: :s16le}, output_stream_format: %Membrane.RawAudio{channels: 1, sample_rate: 16000, sample_format: :f32le}}

21:38:46.637 [debug] <0.282.0>/ Starting child: name: :file_sink, module: Membrane.File.Sink

21:38:46.637 [debug] <0.282.0>/ subprocess supervisor Element start: :file_sink
node: ,
module: Membrane.File.Sink,
element options: %Membrane.File.Sink{location: "/tmp/elixir_converted_from_twilio_16khz_f32.pcm"},
method: start


21:38:46.637 [debug] <0.282.0>/:file_sink/ Initializing element: Membrane.File.Sink, options: %Membrane.File.Sink{location: "/tmp/elixir_converted_from_twilio_16khz_f32.pcm"}

21:38:46.637 [debug] <0.282.0>/:file_src/ Element initialized

21:38:46.638 [debug] <0.282.0>/:file_sink/ Element initialized

21:38:46.644 [debug] <0.282.0>/ Proceeding spec #Reference<0.3194694823.2069889028.48896> startup: initializing, dependent specs: MapSet.new([])

21:38:46.644 [debug] <0.282.0>/ Playing request, :stopped

21:38:46.645 [debug] <0.282.0>/ Setup

21:38:46.647 [debug] <0.282.0>/ Parent play

21:38:46.647 [debug] <0.282.0>/ Proceeding spec #Reference<0.3194694823.2069889028.48896> startup: initializing, dependent specs: MapSet.new([])

21:38:46.647 [debug] <0.282.0>/ Proceeding spec #Reference<0.3194694823.2069889028.48896> startup: initializing, dependent specs: MapSet.new([])

21:38:46.656 [debug] <0.282.0>/:converter/ Element initialized

21:38:46.656 [debug] <0.282.0>/ Proceeding spec #Reference<0.3194694823.2069889028.48896> startup: initializing, dependent specs: MapSet.new([])

21:38:46.656 [debug] <0.282.0>/ Spec #Reference<0.3194694823.2069889028.48896> status changed to initialized

21:38:46.656 [debug] <0.282.0>/ Spec #Reference<0.3194694823.2069889028.48896> status changed to linking internally

21:38:46.658 [debug] <0.282.0>/:file_src/ Element handle link on pad :output with pad :input of child :converter

21:38:46.659 [debug] <0.282.0>/:converter/ Element handle link on pad :input with pad :output of child :file_src
  1. Why are two pids returned? And why do I not need to call .play() on the pipeline as the membrane docs suggest here?

Thanks so much for your help, I really appreciate it <3

  1. Unfortunately we don’t have a plugin that allows a binary input or output, however it would be very simple to implement if you read the file plugin’s code, probably ~30-40 lines of code.
  2. The aborted state is a result of a crash of the pipeline. I don’t know why the error isn’t present in the cell output. There are two causes to the pipeline crashing. The first one is that as soon as you define a callback (such as handle_element_end_of_stream) for one input case, the automatic implementation disappears, so when :file_src returns end of stream, membrane tries to call Resampling.Pipeline.handle_element_end_of_stream(:file_src, _, _, _) and fails to find an implementation. The fix for this is simple: you need to define
    ...
    
    @impl true
      def handle_element_end_of_stream(:file_sink, _pad, _ctx_, _state) do
        {[playback: :stopped], nil}
      end
    
    @impl true
    def handle_element_end_of_stream(_child, _pad, _ctx, _state) do
      {[], nil}
    end
    
    ...
    
    The second problem is returning the playback: :stopped action, which doesn’t exist in membrane core v0.11. You should return terminate: :normal instead.
  3. Functions that start a pipeline return {:ok, supervisor_pid, pipeline_pid} in case of success, because the pipeline is always spawned under a dedicated supervisor. The supervisor never restarts the pipeline, but it makes sure that the pipeline and its children terminate properly.
  4. The guide you’re referring to uses membrane core v0.7, and is unfortunately out of date : (. The play() function was replaced with returning the playback: :playing action, which you did in your handle_init. I’m sorry the guides are outdated, we decided it would be better for now to focus on releasing core 1.0 and rewrite the old guides after that, since then the API will not change every couple months.
  5. Try playing the files after resampling, since as I mentioned, they might be corrupted due to a bug we are currently having in out swresample plugin ;( We’re working on fixing it as soon as possible!

Hope this helps, let me know if you need anything else!

1 Like

Gotta say that the bug manifests itself only if you’re having a WAV bin at the end of pipeline.

Seeping buffers directly from SWResample into Whisper works just fine :person_shrugging:

@jonast Here’s a repo with the code that helped me immensely GitHub - lawik/membrane_transcription: Prototype transcription for Membrane

1 Like

Thanks so much both, that’s super useful. I can confirm that the resampled audio file plays as intended and the LiveBook no longer crashes

Will have a go at writing a BinarySrc plugin now

2 Likes

Cool! if you have any questions be sure to ask!

I’ve got it working pretty quickly, but it’s pretty hacky. It just takes the whole Binary and sends it the first time it gets a demand, irrespective of size etc. I’m also not sure if it’s alright to not do any cleanup or handle anything other than demand (but I think it is alright)

defmodule Resampling.Binary.Source do
  use Membrane.Source
  require Membrane.Logger

  alias Membrane.{Buffer, RemoteStream}

  def_options raw_audio_binary: [
                spec: binary(),
                description: "Raw audio data"
              ],
              chunk_size: [
                spec: pos_integer(),
                default: 2048,
                description: "Size of chunks to process"
              ]

  def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}

  @impl true
  def handle_init(_ctx, %__MODULE__{raw_audio_binary: raw_audio_binary, chunk_size: size}) do
    Membrane.Logger.info("Init with raw_audio_binary #{raw_audio_binary} and chunk size #{size}")
    {[],
     %{
       raw_audio_binary: raw_audio_binary,
       chunk_size: size,
     }}
  end

  # @impl true
  # def handle_setup(_ctx, %{location: location} = state) do
  #   fd = @common_file.open!(location, :read)

  #   {[], %{state | fd: fd}}
  # end

  @impl true
  def handle_playing(_ctx, state) do
    {[stream_format: {:output, %RemoteStream{type: :bytestream}}], state}
  end

  @impl true
  def handle_demand(:output, _size, :buffers, _ctx, %{chunk_size: chunk_size} = state) do
    # Membrane.Logger.info(["handle demand", _size, :buffers])

    supply_demand(chunk_size, [redemand: :output], state)
  end

  def handle_demand(:output, size, :bytes, _ctx, state) do
    Membrane.Logger.info("handle demand, size #{size}, state #{inspect(state)}")
    supply_demand(size, [], state)
  end
    
  # @impl true
  # def handle_terminate_request(_ctx, state) do
  #   @common_file.close!(state.fd)

  #   {[terminate: :normal], %{state | fd: nil}}
  # end

  defp supply_demand(size, redemand, %{raw_audio_binary: raw_audio_binary} = state) do

    # actions = [end_of_stream: :output]
    actions = [buffer: {:output, %Buffer{payload: raw_audio_binary}}, end_of_stream: :output]

    # actions =
    #   case @common_file.binread!(fd, size) do

    #     <<payload::binary>> when byte_size(payload) == size ->

    #       [buffer: {:output, %Buffer{payload: payload}}] ++ redemand

    #     <<payload::binary>> when byte_size(payload) < size ->
    #       [buffer: {:output, %Buffer{payload: payload}}, end_of_stream: :output]

    #     :eof ->
    #       [end_of_stream: :output]
    #   end

    {actions, state}
  end
end

Ideally, I’d like to change this source to “push” data to the next elements in the pipeline, rather than having it pulled. In reality, I’ve got a Phoenix project where I’ve got a websocket connection where I’m receiving raw audio frames (from Twilio), which I’d then like to push into this resampling (and later, transcription) pipeline as they become available.

Any pointers on how best to accomplish that?

If you cannot / don’t want to handle back pressure, you should change the pad mode to push

  def_output_pad :output, accepted_format: %RemoteStream{type: :bytestream}, mode: :push

and not receive the demands at all, like the UDP source does. This of course works under the assumption that you won’t send more data than the pipeline can process.

Also, instead of passing data through options, you can send binaries to the source using regular messages and forward them through the output pad in the handle_info callback.