Handling http response streaming

So I’m getting back a streaming response from a whisper api I set up to use fly.io GPU’s. So far I can call it, get back chunks as they come in using req’s into: parameter and put the text and timestamp on a Phoenix LiveView. What I’m struggling with is how to capture all the incoming chunks to a list so I can save the text segments to Postgres and/or to object storage so I don’t have to transcript that particular audio file again.

# calling transcribe_audio with the url
 
  def async_transcribe(%Episode{} = episode) do
    Task.Supervisor.start_child(NormanAi.TaskSupervisor, fn ->

      result =
        NormanAi.Audio.transcribe_audio(episode.enclosure_url, fn ss, text ->
          segment = %Episode.Transcript.Segment{ss: ss, text: text}
          broadcast!(episode.id, {segment, episode.id})
          IO.inspect("BROADCASTING")

          segment
        end)

      IO.inspect(result)
      # Repo.update_all(from(e in Episode, where: e.id == ^episode.id),
      #   set: [transcript: %Episode.Transcript{segments: segments}]
      # )
    end)
  end

Then the work is done here

  def transcribe_audio(url, callback) do
# internal fly.io url for making api requests to
    req =
      Req.new(
        url: "http://faster-whisper-server-patient-voice-8559.flycast/v1/audio/transcriptions",
        connect_options: [transport_opts: [inet6: true]],
        receive_timeout: 120_000
      )

# hard coded url with short sample audio
    file_path =
      file_path_from_url(
        "https://s3-us-west-2.amazonaws.com/staging-moth-social/media_attachments/files/113/041/865/730/367/819/original/9fd57c861d8f3fc7.mp3"
      )

    multipart =
      Multipart.new()
      |> Multipart.add_part(Multipart.Part.text_field("true", "stream"))
      |> Multipart.add_part(Multipart.Part.file_field(file_path, "file"))

    content_length = Multipart.content_length(multipart)
    content_type = Multipart.content_type(multipart, "multipart/form-data")

    headers = [
      {"Content-Type", content_type},
      {"Content-Length", to_string(content_length)}
    ]

    Req.post(
      req,
      headers: headers,
      body: Multipart.body_stream(multipart),
      into: fn {:data, data}, context ->
        # fn {:ok, {ss, %{chunks: [%{text: text}]}}} -> func.(ss, text) end
        results = parse(data)
        IO.inspect(results)
        IO.inspect(context, label: "CONTEXT")
        segment = Enum.at(results.segments, 0)
        callback.(trunc(segment.start), segment.text)
        {:cont, context}
      end
    )
  end

How could I gather up the incoming chunks in a list or collection so I have all the data to save to persistent?
Each returning data chunk looks like this:

%{
  text: " At Apple Park.",
  words: [],
  task: "transcribe",
  language: "en",
  duration: 0.8200000000000003,
  segments: [
    %{
      id: 14,
      start: 54.64,
      seek: 5736,
      tokens: [51695, 1711, 6373, 4964, 13, 51736],
      text: " At Apple Park.",
      end: 55.46,
      words: nil,
      temperature: 0.0,
      avg_logprob: -0.19924645728253304,
      compression_ratio: 1.7433962264150944,
      no_speech_prob: 1.9550323486328125e-5
    }
  ]
}

I have never actually done this but my impression reading the docs is that you’re meant to use the response in the {req, resp} tuple (your context) as an accumulator. You could place the chunks in the :private field of resp under your own key. See :private under “Fields” at Req.Response.

Or you could use the response :body - not sure if it would get clobbered by anything else.

It would be nice if the docs for :into actually spelled this out as I was looking into this exact thing a couple weeks ago and had to infer that this was the intended approach. Seems like it would be a very common use case.

2 Likes

Yeah I saw the :private field but was scared off with the detail

a map reserved for libraries and frameworks to use. Prefix the keys with the name of your project to avoid any future conflicts. Only accepts atom/0 keys.

But it’s worth a shot

1 Like

Yeah, sorry about that, it’s on my roadmap to improve documentation around this. req.private and resp.private exist solely so when writing steps and into: fun we can store intermediate state. The only restriction is private.req_* key names are reserved for Req for forward-compatibility.

4 Likes

ok. so I should be able to use resp.private to append the chunks as they come back into a list. sweet. I’ll try it out and post the results. I tried with an Agent and managed to get the results back correctly, probably not idiomatic Elixir.

  def transcribe_audio(url, callback) do
    req =
      Req.new(
        url: "http://faster-whisper-server-patient-voice-8559.flycast/v1/audio/transcriptions",
        connect_options: [transport_opts: [inet6: true]],
        receive_timeout: 120_000
      )

    file_path =
      file_path_from_url(
        "https://s3-us-west-2.amazonaws.com/staging-moth-social/media_attachments/files/113/041/865/730/367/819/original/9fd57c861d8f3fc7.mp3"
      )

    multipart =
      Multipart.new()
      |> Multipart.add_part(Multipart.Part.text_field("true", "stream"))
      |> Multipart.add_part(Multipart.Part.file_field(file_path, "file"))

    content_length = Multipart.content_length(multipart)
    content_type = Multipart.content_type(multipart, "multipart/form-data")

    headers = [
      {"Content-Type", content_type},
      {"Content-Length", to_string(content_length)}
    ]

    # Initialize buffer state
    {:ok, agent} = Agent.start_link(fn -> [] end)

    Req.post(
      req,
      headers: headers,
      body: Multipart.body_stream(multipart),
      into: fn {:data, data}, context ->
        # fn {:ok, {ss, %{chunks: [%{text: text}]}}} -> func.(ss, text) end
        results = parse(data)
        segment = Enum.at(results.segments, 0)
        callback.(trunc(segment.start), segment.text)
        :ok = Agent.update(agent, fn state -> [results | state] end)
        {:cont, context}
      end
    )

    # Make sure we shut the agent down
    data_results = Agent.get(agent, & &1) |> Enum.reverse()
    :ok = Agent.stop(agent)
    data_results
  end
1 Like

I don’t seem to be able to append/update :private from within the into: fun

 def transcribe_audio(url, callback) do
    req =
      Req.new(
        url: "http://faster-whisper-server-patient-voice-8559.flycast/v1/audio/transcriptions",
        connect_options: [transport_opts: [inet6: true]],
        receive_timeout: 120_000
      )

    file_path =
      file_path_from_url(
        "https://s3-us-west-2.amazonaws.com/moth-social/media_attachments/files/113/087/363/649/463/883/original/6f9b1ad7ff1c0018.mp3"
      )

    multipart =
      Multipart.new()
      |> Multipart.add_part(Multipart.Part.text_field("true", "stream"))
      |> Multipart.add_part(Multipart.Part.file_field(file_path, "file"))

    content_length = Multipart.content_length(multipart)
    content_type = Multipart.content_type(multipart, "multipart/form-data")

    headers = [
      {"Content-Type", content_type},
      {"Content-Length", to_string(content_length)}
    ]

    {:ok, response} =
      Req.post(
        req,
        headers: headers,
        body: Multipart.body_stream(multipart),
        into: fn {:data, data}, {req, resp} ->
          # try to add any value to the response
          Req.Response.put_private(resp, :transcription, [{%{:b => "moe"}}])
          results = parse(data)
          segment = Enum.at(results.segments, 0)
          callback.(trunc(segment.start), segment.text)
          # update private response
          Req.Response.update_private(resp, :transcript, [], fn state -> [results | state] end)
          IO.inspect(resp.private)
          {:cont, {req, resp}}
        end
      )

    Req.Response.put_private(response, :something_else, [{%{:a => "ted"}}])

    # Make sure we shut the agent down
    # data_results = Agent.get(agent, & &1) |> Enum.reverse()
    # :ok = Agent.stop(agent)
    # data_results
  end

the put_private or update_private from the into: fun are not on the return response, but the final put_private is added

#Req.Response<
  status: 200,
  headers: %{
    "content-type" => ["text/event-stream; charset=utf-8"],
    "date" => ["Thu, 05 Sep 2024 23:39:26 GMT"],
    "fly-request-id" => ["01J728X25018F47F3W0KD31EJ1-ord"],
    "server" => ["Fly/813cb6e67 (2024-09-03)"],
    "transfer-encoding" => ["chunked"],
    "via" => ["1.1 fly.io"]
  },
  body: "",
  trailers: %{},
  private: %{something_else: [{%{a: "ted"}}]},
  ...
>
1 Like

put_private, update_private, etc return updated response, you need to reassign the resp variable.

Btw you can nowadays drop Multipart dependency since we have this capability built in, see encode_body/1 step.

2 Likes

:man_facepalming: d’oh. There we go. Thank you for your help, and the tip on multi part. :+1:

1 Like