How do I know when Finch.stream has finished fetching data?

I am using Finch.stream to fetch a large CSV file from a server, but neither the ‘done’ state nor the ‘error’ state is being triggered when I use "{:halt, {:error, reason}}". I need help figuring out how to determine when the stream has finished fetching data.

Finch.build(:get, url, @headers)
    |> Finch.stream(
      Pdex.Finch,
      {"", {nil, nil}},
      fn
        {:status, status}, acc ->
          IO.inspect(status)
          {:cont, acc}

        {:headers, headers}, acc ->
          IO.inspect(headers)
          {:cont, acc}

        {:data, data}, acc ->
          {:cont, {:cont, {rest, _state} = order}} = acc

          chunk = rest <> data

          # {:halt, {:error, "force not work"}}

          {:cont, {chunk, state}}
       
        # never Geting Call
        {:error, reason}, acc ->
          IO.inspect(acc, label: "Error")
          {:halt, {:error, reason}}

        # never geting Call
        {:done, _}, acc ->
          IO.inspect(acc, label: "Done")
          {:ok, :done}
      end,
      receive_timeout: 30000
    )
  end
1 Like

I think you’re looking for stream_while instead of stream, documented here. The stream function ignores the value returned from the given callback, whereas stream_while will look for the :cont/:halt tuples.

3 Likes

It seems that the third-party library I’m using to get CSV data doesn’t support trailers in the server, which is a component of the state in stream_loop . As a result, I’m unable to determine if the server has finished using stream_while . Does anyone have any other suggestions?

Finch.build(:get, url, @headers)
    |> Finch.stream_while(
      Pdex.Finch,
      {"", {nil, nil}},
      fn
        {:status, status}, acc when status in 200..299 ->
          IO.inspect(status)
          {:cont, acc}

        {:headers, headers}, acc ->
          IO.inspect(headers)
          {:cont, acc}

        {:data, data}, acc ->
          result = process_stream_data(data, acc, callback)
          result
        # never getting call 
        {:trailers, trailers}, acc ->
          IO.inspect(trailers, label: "trailers")
          {:cont, acc}
      end,
      receive_timeout: 3_000_000
    )

Does the server not send HTTP’s Content-Length header?

Trailers are optional and aren’t expected in every response. Look for the :done callback, instead.

Yes, it was included. I tried the ‘done’ state, but according to the documentation, ‘stream_while’ does not have a ‘done’ state, and the server does not respond with trailers when it finishes. here

[
  {"date", "Thu, 02 May 2024 08:40:14 GMT"},
  {"content-type", "text/plain;charset=UTF-8"},
  {"content-length", "988427"},
  {"connection", "keep-alive"},
  {"server", "nginx"},
  {"vary", "Accept-Encoding"},
  {"content-disposition",
   "attachment; filename=\"file.csv\""},
  {"cache-control", "no-cache, private"},
  {"strict-transport-security", "max-age=31536000; includeSubDomains"},
  {"x-frame-options", "SAMEORIGIN"}
]

Well in that case I’d try downloading the file while counting bytes since you have the expected total.

Finch.stream returns the acc on successful completion. You can use that determine if the stream has finished.

Finch.stream(Finch.build(:get, url), MyFinch, nil, fn
  {:status, status}, nil ->
    IO.inspect(status: status)
    0

  {:headers, headers}, length ->
    IO.inspect(headers: headers)
    length

  {:data, data}, length ->
    IO.inspect(data: IO.iodata_length(data))
    length + IO.iodata_length(data)
end)
|> case do
  {:ok, length} ->
    # streaming complete, handle `:done` state
    IO.inspect(total_length: length)
    
  {:error, error} -> 
    raise error
end
2 Likes

The server should close the connection once it has finished sending the file, which will cause stream to finish. If the server is not closing the connection then you’ll need to use stream_while, storing content length from the headers and the sum of bytes received as data and then returning :halt once you have all the data to end your side of the connection.