I have a simple Phoenix.Channel that sends Audio to a Membrane Pipeline. How do I send the Audio from the Pipeline to the source?
the channel:
defmodule AudioTranscriptionWeb.AudioChannel do
use Phoenix.Channel
def join("audio:stream", _payload, socket) do
{:ok, socket}
end
def handle_in("new_audio", %{"data" => audio_data}, socket) do
# Send the audio data to the Membrane Pipeline
AudioTranscription.StreamToFile.send_audio_data(audio_data)
{:noreply, socket}
end
end
Membrane Pipeline
defmodule AudioTranscription.StreamToFile do
use Membrane.Pipeline
alias Membrane.File.Sink
@impl true
def handle_init(_) do
spec =
child(AudioTranscription.Source)
|> child(%Membrane.File.Sink{location: "output.wav"})
{[spec: spec], %{}}
end
@impl true
def notify_child(AudioTranscription.Source, data) do
# How do I notify the Source
end
@impl true
def send_audio_data(audio_data) do
# Process the audio data
notify_child(AudioTranscription.Source, audio_data) # not working
# AudioTranscription.AudioSource.receive_audio_data(audio_data)
end
end
Membrane.Source
defmodule AudioTranscription.Source do
@moduledoc """
Membrane element counting incoming buffers.
Count of buffers divided by `divisor` (passed via `:input` pad options)
is sent as a `{:counter, number}` notification once every `interval`
(passed via element options).
"""
use Membrane.Source
alias Membrane.Buffer
alias Membrane.RawAudio
def_output_pad :output,
availability: :always,
flow_control: :manual,
accepted_format: _any
@impl true
def handle_init(_) do
{:ok, %{buffer: []}}
end
@impl true
def handle_demand(:output, _size, _unit, _ctx, state) do
{{:ok, buffer: state.buffer}, state}
end
@impl true
def handle_playing(_ctx, state) do
{{:ok, []}, state}
end
def handle_push_buffer(buffer, _ctx, state) do
{:ok, state}
end
def receive_audio_data(audio_data) do
send(self(), {:new_audio_data, audio_data})
end
@impl true
def handle_info({:new_audio_data, audio_data}, _ctx, state) do
buffer = %Buffer{payload: audio_data}
{{:ok, buffer: {:output, buffer}}, state}
end
def handle_parent_notification(notification, _context, _state) do
IO.inspect(notification)
end
end
Or should I use something else to achieve this? Please help, thank you.