So I’m getting back a streaming response from a whisper api I set up to use fly.io GPU’s. So far I can call it, get back chunks as they come in using req’s into:
parameter and put the text and timestamp on a Phoenix LiveView. What I’m struggling with is how to capture all the incoming chunks to a list so I can save the text segments to Postgres and/or to object storage so I don’t have to transcript that particular audio file again.
# calling transcribe_audio with the url
def async_transcribe(%Episode{} = episode) do
Task.Supervisor.start_child(NormanAi.TaskSupervisor, fn ->
result =
NormanAi.Audio.transcribe_audio(episode.enclosure_url, fn ss, text ->
segment = %Episode.Transcript.Segment{ss: ss, text: text}
broadcast!(episode.id, {segment, episode.id})
IO.inspect("BROADCASTING")
segment
end)
IO.inspect(result)
# Repo.update_all(from(e in Episode, where: e.id == ^episode.id),
# set: [transcript: %Episode.Transcript{segments: segments}]
# )
end)
end
Then the work is done here
def transcribe_audio(url, callback) do
# internal fly.io url for making api requests to
req =
Req.new(
url: "http://faster-whisper-server-patient-voice-8559.flycast/v1/audio/transcriptions",
connect_options: [transport_opts: [inet6: true]],
receive_timeout: 120_000
)
# hard coded url with short sample audio
file_path =
file_path_from_url(
"https://s3-us-west-2.amazonaws.com/staging-moth-social/media_attachments/files/113/041/865/730/367/819/original/9fd57c861d8f3fc7.mp3"
)
multipart =
Multipart.new()
|> Multipart.add_part(Multipart.Part.text_field("true", "stream"))
|> Multipart.add_part(Multipart.Part.file_field(file_path, "file"))
content_length = Multipart.content_length(multipart)
content_type = Multipart.content_type(multipart, "multipart/form-data")
headers = [
{"Content-Type", content_type},
{"Content-Length", to_string(content_length)}
]
Req.post(
req,
headers: headers,
body: Multipart.body_stream(multipart),
into: fn {:data, data}, context ->
# fn {:ok, {ss, %{chunks: [%{text: text}]}}} -> func.(ss, text) end
results = parse(data)
IO.inspect(results)
IO.inspect(context, label: "CONTEXT")
segment = Enum.at(results.segments, 0)
callback.(trunc(segment.start), segment.text)
{:cont, context}
end
)
end
How could I gather up the incoming chunks in a list or collection so I have all the data to save to persistent?
Each returning data chunk looks like this:
%{
text: " At Apple Park.",
words: [],
task: "transcribe",
language: "en",
duration: 0.8200000000000003,
segments: [
%{
id: 14,
start: 54.64,
seek: 5736,
tokens: [51695, 1711, 6373, 4964, 13, 51736],
text: " At Apple Park.",
end: 55.46,
words: nil,
temperature: 0.0,
avg_logprob: -0.19924645728253304,
compression_ratio: 1.7433962264150944,
no_speech_prob: 1.9550323486328125e-5
}
]
}