jtomchak

jtomchak

Handling http response streaming

So I’m getting back a streaming response from a whisper api I set up to use fly.io GPU’s. So far I can call it, get back chunks as they come in using req’s into: parameter and put the text and timestamp on a Phoenix LiveView. What I’m struggling with is how to capture all the incoming chunks to a list so I can save the text segments to Postgres and/or to object storage so I don’t have to transcript that particular audio file again.

# calling transcribe_audio with the url
 
  def async_transcribe(%Episode{} = episode) do
    Task.Supervisor.start_child(NormanAi.TaskSupervisor, fn ->

      result =
        NormanAi.Audio.transcribe_audio(episode.enclosure_url, fn ss, text ->
          segment = %Episode.Transcript.Segment{ss: ss, text: text}
          broadcast!(episode.id, {segment, episode.id})
          IO.inspect("BROADCASTING")

          segment
        end)

      IO.inspect(result)
      # Repo.update_all(from(e in Episode, where: e.id == ^episode.id),
      #   set: [transcript: %Episode.Transcript{segments: segments}]
      # )
    end)
  end

Then the work is done here

  def transcribe_audio(url, callback) do
# internal fly.io url for making api requests to
    req =
      Req.new(
        url: "http://faster-whisper-server-patient-voice-8559.flycast/v1/audio/transcriptions",
        connect_options: [transport_opts: [inet6: true]],
        receive_timeout: 120_000
      )

# hard coded url with short sample audio
    file_path =
      file_path_from_url(
        "https://s3-us-west-2.amazonaws.com/staging-moth-social/media_attachments/files/113/041/865/730/367/819/original/9fd57c861d8f3fc7.mp3"
      )

    multipart =
      Multipart.new()
      |> Multipart.add_part(Multipart.Part.text_field("true", "stream"))
      |> Multipart.add_part(Multipart.Part.file_field(file_path, "file"))

    content_length = Multipart.content_length(multipart)
    content_type = Multipart.content_type(multipart, "multipart/form-data")

    headers = [
      {"Content-Type", content_type},
      {"Content-Length", to_string(content_length)}
    ]

    Req.post(
      req,
      headers: headers,
      body: Multipart.body_stream(multipart),
      into: fn {:data, data}, context ->
        # fn {:ok, {ss, %{chunks: [%{text: text}]}}} -> func.(ss, text) end
        results = parse(data)
        IO.inspect(results)
        IO.inspect(context, label: "CONTEXT")
        segment = Enum.at(results.segments, 0)
        callback.(trunc(segment.start), segment.text)
        {:cont, context}
      end
    )
  end

How could I gather up the incoming chunks in a list or collection so I have all the data to save to persistent?
Each returning data chunk looks like this:

%{
  text: " At Apple Park.",
  words: [],
  task: "transcribe",
  language: "en",
  duration: 0.8200000000000003,
  segments: [
    %{
      id: 14,
      start: 54.64,
      seek: 5736,
      tokens: [51695, 1711, 6373, 4964, 13, 51736],
      text: " At Apple Park.",
      end: 55.46,
      words: nil,
      temperature: 0.0,
      avg_logprob: -0.19924645728253304,
      compression_ratio: 1.7433962264150944,
      no_speech_prob: 1.9550323486328125e-5
    }
  ]
}

Marked As Solved

wojtekmach

wojtekmach

Hex Core Team

Yeah, sorry about that, it’s on my roadmap to improve documentation around this. req.private and resp.private exist solely so when writing steps and into: fun we can store intermediate state. The only restriction is private.req_* key names are reserved for Req for forward-compatibility.

Also Liked

garrison

garrison

I have never actually done this but my impression reading the docs is that you’re meant to use the response in the {req, resp} tuple (your context) as an accumulator. You could place the chunks in the :private field of resp under your own key. See :private under “Fields” at Req.Response.

Or you could use the response :body - not sure if it would get clobbered by anything else.

It would be nice if the docs for :into actually spelled this out as I was looking into this exact thing a couple weeks ago and had to infer that this was the intended approach. Seems like it would be a very common use case.

jtomchak

jtomchak

ok. so I should be able to use resp.private to append the chunks as they come back into a list. sweet. I’ll try it out and post the results. I tried with an Agent and managed to get the results back correctly, probably not idiomatic Elixir.

  def transcribe_audio(url, callback) do
    req =
      Req.new(
        url: "http://faster-whisper-server-patient-voice-8559.flycast/v1/audio/transcriptions",
        connect_options: [transport_opts: [inet6: true]],
        receive_timeout: 120_000
      )

    file_path =
      file_path_from_url(
        "https://s3-us-west-2.amazonaws.com/staging-moth-social/media_attachments/files/113/041/865/730/367/819/original/9fd57c861d8f3fc7.mp3"
      )

    multipart =
      Multipart.new()
      |> Multipart.add_part(Multipart.Part.text_field("true", "stream"))
      |> Multipart.add_part(Multipart.Part.file_field(file_path, "file"))

    content_length = Multipart.content_length(multipart)
    content_type = Multipart.content_type(multipart, "multipart/form-data")

    headers = [
      {"Content-Type", content_type},
      {"Content-Length", to_string(content_length)}
    ]

    # Initialize buffer state
    {:ok, agent} = Agent.start_link(fn -> [] end)

    Req.post(
      req,
      headers: headers,
      body: Multipart.body_stream(multipart),
      into: fn {:data, data}, context ->
        # fn {:ok, {ss, %{chunks: [%{text: text}]}}} -> func.(ss, text) end
        results = parse(data)
        segment = Enum.at(results.segments, 0)
        callback.(trunc(segment.start), segment.text)
        :ok = Agent.update(agent, fn state -> [results | state] end)
        {:cont, context}
      end
    )

    # Make sure we shut the agent down
    data_results = Agent.get(agent, & &1) |> Enum.reverse()
    :ok = Agent.stop(agent)
    data_results
  end
wojtekmach

wojtekmach

Hex Core Team

put_private, update_private, etc return updated response, you need to reassign the resp variable.

Btw you can nowadays drop Multipart dependency since we have this capability built in, see encode_body/1 step.

Where Next?

Popular in Questions Top

lessless
I believe there are people here who are dealing with CSV files import on the daily basis, and since Excel is a really popular tool there ...
New
jaysoifer
Is there a way to rollback a specific migration and only that one ("skipping" all the other ones)? Would mix ecto.rollback -v 2008090...
New
pmjoe
I have a relationship of love and hate with Elixir. Lots of things are just absolutely right, but there are some things that are kind of ...
New
johnnyicon
Hi all, I've just started learning Elixir and Phoenix Framework, so please pardon my n00bness at this stage. I'm trying to use Postg...
New
itssasanka
Hi all, Trying to get some more clarity over utc_datetime and naive_datetime for Ecto: https://hexdocs.pm/ecto/Ecto.Schema.html#module-...
New
jerry
Good day to you all. I have been struggling to get a query involving like and ilike to work. Can anyone assist me on this, please? pro...
New
ycv005
I have followed this StackOverflow post to install the specific version of Erlang. And When I am running mix ecto.setup then getting fol...
New
rms.mrcs
Hi, I need to transform a list of numbers into a map where the keys are the indexes and the values are the original values of the list....
New
JDanielMartinez
Hi! May someone helps me, please! I have two apps into an umbrella project: the first one is Database, which manages queries, and the se...
New
openscript
Hello! Sorry for this astonishing simple question, but I’m really stuck. I try to set up the intellij-elixir plugin, but I don’t know ho...
New

Other popular topics Top

sen
Hi All, I set a environment variables in dev.exs , like below code. when i start server, how can i set the ${enable} value? thanks. d...
New
AstonJ
Posting this to see if we can make things easier for people to get into Neovim. If you use Neovim and have a favourite distro please let ...
New
JorisKok
I have a server on AWS, and was running a load test using artillery. When looking at the Phoenix dashboard I see the Ports going to 100% ...
New
freewebwithme
Using vs code and installed ElixirLS: support and debugger. And I got an error popped up on start up says Failed to run ‘elixir’ comma...
New
ashish173
I am using Ecto timestamps with postgres, I can see the timestamps() use the :naive_dateime but for my use case I wanted to store the ti...
New
dblack
I’ve got an issue with an app and I’ve no idea of how to troubleshoot it. I’m hoping someone here might have seen something similar. I p...
New
AstonJ
Please see the new poll here: Which code editor or IDE do you use? (Poll) (2022 Edition) It’s been a while since we first asked this, I...
208 31107 143
New
Brian
What is the proper way to load a module from a file in to IEX? In the python world, doing something like this pretty standard: from ....
New
dogweather
I wrote this comment on r/haskell, and it’s not popular there. :wink: But I think I’m on to something… Haskell reminds me of Java, and e...
New
svb
Hi! Currently I want to submit a form by pressing the Enter key. However, since my input field is of type “textarea” this is just adds a...
New

We're in Beta

About us Mission Statement