Aborting Req request if max size exceeded while streaming

Hi!

I have a Downloader module that GETs files given a URL, using Req. It has a timeout option but I would like to abort the download when it exceeds a given maximum size, so I don’t have to wait until the file has been completely downloaded to check the final size.

I think that can be done using Req’s streaming capabilities but I’m not being able to get it working. Can anyone help with this? This is my module so far:

defmodule Utils.Downloader do
  require Logger

  @default_receive_timeout_ms 20_000

  def download(url, opts \\ []) do
    Logger.debug("[#{__MODULE__}] downloading...")

    with {:ok, req_client} <- prepare_req_client(opts),
         {:ok, %Req.Response{status: 200, body: body, headers: headers}} <-
           Req.get(req_client, url: url),
         {:ok, content_type} <- get_content_type(headers) do
      Logger.debug("[#{__MODULE__}] finished downloading...")

      {:ok, %{content_type: content_type, data: body}}
    else
      {:error, :cant_get_content_type} ->
        {:error, :cant_get_content_type}

      %Req.Response{status: status} when status != 200 ->
        {:erorr, :cant_download_image}
    end
  end

  defp get_content_type(headers) do
    headers
    |> Enum.find(fn {key, _} -> String.downcase(key) == "content-type" end)
    |> case do
      {_, value} when is_list(value) -> {:ok, hd(value)}
      {_, value} when is_binary(value) -> {:ok, value}
      nil -> {:error, :cant_get_content_type}
    end
  end

  defp prepare_req_client(opts \\ []) do
    receive_timeout_ms = opts[:receive_timeout_ms] || @default_receive_timeout_ms

    client = Req.new(receive_timeout: receive_timeout_ms)

    {:ok, client}
  end
end

1 Like

Welcome to the forum @spurgus!

If the response includes the content-length header then you can check that from a response step:

req =
  Req.new()
  |> Req.Request.prepend_response_steps(validate_content_length: fn {req, resp} ->
    with [header] <- Req.Response.get_header(resp, "content-length"),
         {content_length, ""} <- Integer.parse(header) do
      if content_length > @max_content_length do
        Req.cancel_async_response(resp)
        {req, RuntimeError.exception(message: "content-length too large")}
      else
        {req, resp}
      end
    else
      _ ->
        Req.cancel_async_response(resp)
        {req, RuntimeError.exception(message: "Invalid content-length")}
    end
  end)

Otherwise, you’ll have to keep track of received bytes and halt the request. You mentioned streaming but didn’t say which form of streaming (into: :self, into: &fun/2, into: collectable). Here’s an example for the function form of streaming:

Req.get(req, into: fn {:data, data}, {req, resp} ->
   resp = Req.Response.update_private(resp, :length, 0, & &1 + byte_size(data))

   if Req.Response.get_private(resp, :length) > @max_content_length do
     {:halt, {req, RuntimeError.exception(message: "content length too large")}}
   else
     {:cont, {req, resp}}
   end
 end)
1 Like

Thanks a lot for your help @jswanner !

Most of the times the content-length header will be present but I think that checking for the actual size while downloading is a good measure just in case it’s missing or wrong.

This is what I’m trying, but I get an empty body in the response??

defmodule Utils.Downloader do
  require Logger

  @default_receive_timeout_ms 20_000
  @default_max_size_bytes 20 * 1_024 * 1_024

  def download(url, opts \\ []) do
    with {:ok, %Req.Response{status: 200, body: body, headers: headers}} <-
           request(url, opts),
         {:ok, content_type} <- get_content_type(headers) do
      Logger.debug("[#{__MODULE__}] finished downloading...")

      {:ok, %{content_type: content_type, data: body}}
    else
      {:error, :cant_get_content_type} ->
        {:error, :cant_get_content_type}

      %Req.Response{status: status} when status != 200 ->
        {:error, :cant_download}
    end
  end

  defp request(url, opts \\ []) do
    Logger.debug("[#{__MODULE__}] downloading...")

    receive_timeout_ms = opts[:receive_timeout_ms] || @default_receive_timeout_ms
    max_size_bytes = opts[:max_size_bytes] || @default_max_size_bytes

    Req.new()
    |> Req.get(
      url: url,
      receive_timeout: receive_timeout_ms,
      into: fn {:data, data}, {req, resp} ->
        resp = Req.Response.update_private(resp, :length, 0, &(&1 + byte_size(data)))

        if Req.Response.get_private(resp, :length) > @max_content_length do
          {:halt, {req, RuntimeError.exception(message: "streamed content length too large")}}
        else
          {:cont, {req, resp}}
        end
      end
    )
    |> IO.inspect(label: "request finished")
  end

  defp get_content_type(headers) do
    headers
    |> Enum.find(fn {key, _} -> String.downcase(key) == "content-type" end)
    |> case do
      {_, value} when is_list(value) -> {:ok, hd(value)}
      {_, value} when is_binary(value) -> {:ok, value}
      nil -> {:error, :cant_get_content_type}
    end
  end
end

Try:

{:cont, {req, update_in(resp.body, &(&1 <> data))}}

I believe this into: &fun/2 option is envisioned for scenarios where you’ll be doing something with the data as it’s coming in (such as sending it to another process), rather than accumulating it and processing it at the end.

1 Like

I see, thanks @jswanner - this is more complicated than I thought in the beginning, maybe I’ll just rely on the content-length header and timeouts to protect against downloading huge files.

I was not meaning to imply this use of into: &fun/2 is wrong, just pointing out Req doesn’t accumulate the body for you with this option (maybe it should?).

1 Like

Using :into like this is the correct approach. The example in the docs doesn’t accumulate the body (it just writes it to the console). Frankly this is not a very helpful example and should be updated.

You absolutely should not rely on content-length. There have been serious security problems caused by people mistakenly trusting that header. The server can lie about it at will, maliciously. A timeout is also not a good approach as there is no guarantee you’re not downloading a large file very quickly!

Yes, that’s my concern, someone trying to take your server down, making you download huge files, so my idea was to use streaming to abort the download as soon as the max size has been detected.

Let’s see if I can get this to work…

1 Like

Never trust headers unless you’ve validated them.

Once I had an Elixir bot running in a hostile environment (Kodi addon ecosystem) . I am quite sure it would not have survived the first week if I trusted the headers. Content-lengths spoofing (read: simply returning an everlasting stream of random bits) was one of the first concerns.

1 Like

Oops, I just realized my suggestion had a bug that doesn’t include the first chunk of data in the calculated length, should be:

Req.Response.update_private(resp, :length, byte_size(data), &(&1 + byte_size(data)))

Thanks all, I agree that the content-length header is not to be trusted, but then what’s the way to return the whole downloaded file once it has finished downloading (and abort if size exceeds during the streaming phase)?

Does this not work?

into: fn {:data, data}, {req, resp} ->
  resp = Req.Response.update_private(resp, :length, byte_size(data), &(&1 + byte_size(data)))

  if Req.Response.get_private(resp, :length) > max_size_bytes do
    {:halt, {req, RuntimeError.exception(message: "streamed content length too large")}}
  else
    {:cont, {req, update_in(resp.body, &(&1 <> data))}}
  end
end
4 Likes

Hey thanks @jswanner this does seem to work!

I’ll check more but this looks great. Thanks a lof!

Thanks, everyone, for your help, you’re awesome!

For the record, here’s the full code:

defmodule Downloader do
  require Logger

  @default_receive_timeout_ms 10_000
  @default_max_retries 3
  @default_max_size_bytes 20 * 1_024 * 1_024

  def download(url, opts \\ []) do
    Logger.debug("[#{__MODULE__}] downloading...")

    with {:ok, :valid_url} <- valid_url?(url),
         {:ok, req_client} <- prepare_req_client(opts),
         {:ok, %Req.Response{status: 200, body: body, headers: headers}} <-
           Req.get(req_client, url: url),
         {:ok, content_type} <- get_header(headers, "content-type") do
      Logger.debug("[#{__MODULE__}] finished downloading...")

      {:ok, %{data: body, content_type: content_type}}
    else
      {:error, :invalid_url} ->
        {:error, :invalid_url}

      {:ok, %Req.Response{status: status}} ->
        {:error, :remote_server_error, status}

      {:error, %Req.TransportError{reason: :econnrefused}} ->
        {:error, :remote_server_error, :econnrefused}

      {:error, %Req.TransportError{reason: :timeout}} ->
        {:error, :remote_server_error, :timeout}

      {:error, %RuntimeError{message: "streamed content length too large"}} ->
        {:error, :max_size_exceeded}
    end
  end

  defp valid_url?(url) when is_binary(url) do
    case URI.parse(url) do
      %URI{scheme: scheme, host: host} when scheme in ["http", "https"] and is_binary(host) ->
        {:ok, :valid_url}

      _ ->
        {:error, :invalid_url}
    end
  end

  defp valid_url?(_) do
    {:error, :invalid_url}
  end

  defp get_header(headers, name) do
    headers
    |> Enum.find(fn {key, _} -> String.downcase(key) == name end)
    |> case do
      {_, value} when is_list(value) -> {:ok, hd(value)}
      {_, value} when is_binary(value) -> {:ok, value}
      nil -> {:ok, nil}
    end
  end

  defp prepare_req_client(opts) do
    receive_timeout_ms = opts[:receive_timeout_ms] || @default_receive_timeout_ms
    max_retries = opts[:max_retries] || @default_max_retries
    max_size_bytes = opts[:max_size_bytes] || @default_max_size_bytes

    client =
      Req.new(
        receive_timeout: receive_timeout_ms,
        max_retries: max_retries,
        into: fn {:data, data}, {req, resp} ->
          resp =
            Req.Response.update_private(
              resp,
              :length,
              byte_size(data),
              &(&1 + byte_size(data))
            )

          if Req.Response.get_private(resp, :length) > max_size_bytes do
            {:halt, {req, RuntimeError.exception(message: "streamed content length too large")}}
          else
            {:cont, {req, update_in(resp.body, &(&1 <> data))}}
          end
        end
      )

    {:ok, client}
  end
end

1 Like