Membrane and Whisper Audio

Hey folks,

i am trying to get Membrane and Whisper working together. For now, the goal is, to use Mebrane so stream a file into Whisper. I worked on the following Livebook but cannot figure out, why Whisper is not correctly transcribing.
When i pipe the output of the converter into portaudio it is very noisy. When i set the converter to s16le the PortAudio audio sounds fine! But when i use :f32le and write it to a PCM file and import it into Audacity, it sounds fine again, despite the PortAudio Sink is not.
Either way, Whisper is not translating correctly. Can you help me out, where the issue could be?

Mix.install(
  [
    {:membrane_core, "~> 0.12.7"},
    {:membrane_file_plugin, "~> 0.15.0"},
    {:membrane_mp4_plugin, "~> 0.27.0"},
    {:membrane_fake_plugin, "~> 0.10.0"},
    {:membrane_raw_audio_format, "~> 0.11.0"},
    {:membrane_ffmpeg_swresample_plugin,
     git: "https://github.com/membraneframework/membrane_ffmpeg_swresample_plugin",
     branch: "master"},
    {:membrane_aac_plugin, "~> 0.15.0"},
    {:membrane_aac_fdk_plugin, "~> 0.15.1"},
    {:membrane_portaudio_plugin, "~> 0.16.1"},
    {:kino_bumblebee, "~> 0.3.0"},
    {:exla, "~> 0.5.1"}
  ],
  config: [nx: [default_backend: EXLA.Backend], membrane_core: [logger: [verbose: false]]]
)
#dependecies:
brew install fdk-aac
brew install portaudio
model = "tiny"

{:ok, model_info} = Bumblebee.load_model({:hf, "openai/whisper-#{model}"})
{:ok, featurizer} = Bumblebee.load_featurizer({:hf, "openai/whisper-#{model}"})
{:ok, tokenizer} = Bumblebee.load_tokenizer({:hf, "openai/whisper-#{model}"})
{:ok, generation_config} = Bumblebee.load_generation_config({:hf, "openai/whisper-#{model}"})
generation_config = Bumblebee.configure(generation_config, max_new_tokens: 100)

generation_config = %{
  generation_config
  | forced_token_ids: [
      # tell whisper that spoken text is german
      {1, Bumblebee.Tokenizer.token_to_id(tokenizer, "<|de|>")},
      {2, Bumblebee.Tokenizer.token_to_id(tokenizer, "<|transcribe|>")}
    ]
}

serving =
  Bumblebee.Audio.speech_to_text(model_info, featurizer, tokenizer, generation_config,
    compile: [batch_size: 1],
    defn_options: [compiler: EXLA]
  )
Nx.Serving.start_link(serving: serving, name: WhisperServing)
defmodule WhisperSink do
  use Membrane.Sink
  require Logger
  def_input_pad(:input, demand_unit: :buffers, accepted_format: _any)

  @impl true
  def handle_playing(_ctx, state) do
    {[demand: :input], state}
  end

  @impl true
  def handle_write_list(:input, buffers, _ctx, state) do
    for buffer <- buffers do
      data =
        buffer.payload
        |> Nx.from_binary(:f32)

      Logger.info(inspect(Nx.Serving.batched_run(WhisperServing, data)))
    end

    {[demand: {:input, length(buffers)}], state}
  end
end
defmodule Pipeline do
  use Membrane.Pipeline

  @in_file "/path/to/video.mp4"

  @impl true
  def handle_init(_context, _opts) do
    structure = [
      child(:file_source, %Membrane.File.Source{location: @in_file, seekable?: true})
      |> child(:demuxer, %Membrane.MP4.Demuxer.ISOM{optimize_for_non_fast_start?: true})
      |> via_out(Pad.ref(:output, 2))
      |> child(:depayloader_audio, Membrane.MP4.Depayloader.AAC)
      |> child(:audio_parser, %Membrane.AAC.Parser{
        in_encapsulation: :none,
        out_encapsulation: :ADTS
      })
      |> child(:aac_decoder, Membrane.AAC.FDK.Decoder)
      |> child(:converter, %Membrane.FFmpeg.SWResample.Converter{
        output_stream_format: %Membrane.RawAudio{
          sample_format: :f32le,
          sample_rate: 16_000,
          channels: 1
        }
      })
      # |> child(:speaker, Membrane.PortAudio.Sink),
      |> child(:whisper, WhisperSink),
      get_child(:demuxer)
      |> via_out(Pad.ref(:output, 1))
      |> child(:fake, Membrane.Fake.Sink.Buffers)
    ]

    {[spec: structure, playback: :playing], %{}}
  end

  @impl true
  def handle_element_end_of_stream(:file_sink, _pad, _ctx_, _state) do
    IO.inspect("terminate")
    {[terminate: :normal], nil}
  end

  @impl true
  def handle_element_end_of_stream(_child, _pad, _ctx, _state) do
    {[], nil}
  end

  def handle_child_notification(notification, _element, _context, state) do
    # dbg(notification)
    {[], state}
  end
end
{:ok, _, _} = Pipeline.start_link()

Hi there, I’ll look into the converter & port audio issue. Regarding speech-to-text, have you tried this example?

1 Like

Hi @mat-hek,

oh wow, thanks for the LiveBook example! I did not find it. After some trial and error and the membrane_transcription repo from @lawik i found out, that i obviously need to create a buffer, otherwise the length of the Membrane.Buffer.payload is to small for Whisper. The example would have saved me some time :slight_smile:

I currently got this to work with a Whisper Serving startet in the Supervision tree and used manual demand. Otherwise the buffer would be too big for the Whisper model, because it can only handle a maximum chunk of 30s.

defmodule MembraneWhisper.WhisperSink do
  use Membrane.Sink
  alias Membrane.RawAudio
  require Membrane.Logger

  @sample_rate 16_000
  # sample format is `:f32le`, so each format is written on 32 bits = 4 bytes
  @bytes_per_sample 4
  @channels 1
  # seconds
  @chunk_duration 10
  @chunk_size @sample_rate * @bytes_per_sample * @channels * @chunk_duration

  def_input_pad(:input,
    accepted_format: %RawAudio{
      channels: @channels,
      sample_rate: @sample_rate,
      sample_format: :f32le
    },
    flow_control: :manual
  )

  @impl true
  def handle_init(_ctx, _options) do
    state = %{
      samples: <<>>,
      transcribing?: false
    }

    {[], state}
  end

  @impl true
  def handle_playing(_ctx, state) do
    {[demand: :input], state}
  end

  @impl true
  def handle_buffer(:input, %Membrane.Buffer{} = buffer, _context, state) do
    all_samples = state.samples <> buffer.payload

    state =
      if byte_size(all_samples) > @chunk_size and not state.transcribing? do
        transcribe(all_samples)
        %{state | samples: <<>>, transcribing?: true}
      else
        %{state | samples: all_samples}
      end

    if byte_size(all_samples) > @chunk_size do
      {[], state}
    else
      {[demand: :input], state}
    end
  end

  defp transcribe(data) do
    send_to = self()

    Task.start(fn ->
      model_input = Nx.from_binary(data, :f32)
      results = Nx.Serving.batched_run(MembraneWhisper.WhisperServing, model_input)
      transcription = Enum.map_join(results.results, " ", & &1.text)
      send(send_to, {:transcribed, transcription})
    end)
  end

  def handle_info({:transcribed, transcription}, _ctx, state) do
    IO.inspect(transcription)
    state = %{state | transcribing?: false}
    {[demand: :input], state}
  end

  def handle_info(other_msg, _, state) do
    Membrane.Logger.debug("Unknown message received: #{inspect(other_msg)}")
    {[], state}
  end
end

Again, thank you for your help!

1 Like

Yeah, we’ll make those livebooks more discoverable. If the payload is too small/big, you can just concat/split it in the WhisperSink as well :wink: However, to use automatic demands, you’d also have to move the logic from the task to the element or await on the task - otherwise, you can be flooded with data :ocean:

BTW, you can use functions from Membrane.RawAudio — Membrane: raw audio format v0.11.0 instead of hardcoding sample rate & friends. Happy hacking :wink:

2 Likes

PortAudio bug fixed in Release v0.16.2 · membraneframework/membrane_portaudio_plugin · GitHub :wink:

2 Likes