How to make a large JSON HTTP request without spiking the memory usage?

I need to make a POST request with JSON of considerable size (~10MB). Is it possible to make such a request without spiking the memory usage?

Thanks to Jason.encode_to_iodata!/2, the payload is an iodata. I’m using Finch to send my request:

payload = Jason.encode_to_iadata!(data)
headers = [{"content-type", "application/json"}]
request = Finch.build(:post, url, headers, body)
Finch.request(request, MyFinch)

When the request is being processed, the Observer is showing a spike in memory usage:

Is there a way to avoid the second spike (with Finch or some other HTTP client)? I tried sending the payload as {:stream, list}, but I’m getting {:error, %Mint.TransportError{reason: :closed}}. Not sure if this is related to a specific server.

One of the values in the JSON is a pretty big blob, but I wrote a function that chunks it into smaller pieces, so I’d rule that factor out.

2 Likes

Since there is a big blob maybe try iolist_to_iovec on the encoded json + see if you can open the TCP connection with the socket backend. Erlang -- inet. I don’t see an option in Finch to forward socket opts, and Mint doesn’t seem use the socket backend by default so it might be tricky. A bit of a shot in the dark. Suggesting this because I did a bit of testing back last year and noticed that outgoing traffic consumed less memory (can’t 100% remember if it was binary), however incoming memory seemed to increase slightly. This was moving 1-4GBs between a few cloud VMs.

I guess there is inet_default_connect_options which might be able to default the inet backend to socket? From the docs

erl -sname test -kernel \
inet_default_connect_options '[{delay_send,true}]' \
inet_default_listen_options '[{delay_send,true}]'

How did you make this request in detail?

I’ve spent some more time on this and dropped down to Mint to have a better understanding of what’s happening. The TransportError seems to be related to the specific server that I was testing with which I now replaced with a locally running Phoenix app.

To recap: I’m sending out emails via REST, so I need to send quite a lot of JSON requests that share a big binary blob (bese-64 encoded attachment). I’m caching the blob, but I’m still facing memory usage spikes which in the end causes out of memory issues. Here’s the test script that I’m using to reproduce this:

Mix.install([:jason, :mint])
:observer.start()

defmodule TestMod do
  @scheme :https
  @host "localhost"
  @port 4040
  @method "POST"
  @path "/test"
  @headers [{"content-type", "application/json"}]
  @sleep 3000
  # Don't complain on a self-signed certificat
  @connect_opts [transport_opts: [verify: :verify_none]]

  def request_regular() do
    IO.puts("Loading data...")
    body = big_json("regular")
    Process.sleep(@sleep)
    IO.puts("Regular request...")
    {:ok, conn} = Mint.HTTP1.connect(@scheme, @host, @port, @connect_opts)
    {:ok, conn, request_ref} = Mint.HTTP1.request(conn, @method, @path, @headers, body)
    receive_response(conn, request_ref)
  end

  def request_stream() do
    IO.puts("Loading data...")
    body = big_json("stream")
    Process.sleep(@sleep)
    IO.puts("Streaming request body...")
    {:ok, conn} = Mint.HTTP1.connect(@scheme, @host, @port, @connect_opts)
    {:ok, conn, request_ref} = Mint.HTTP1.request(conn, @method, @path, @headers, :stream)
    {:ok, conn} = Mint.HTTP1.stream_request_body(conn, request_ref, body)
    {:ok, conn} = Mint.HTTP1.stream_request_body(conn, request_ref, :eof)
    receive_response(conn, request_ref)
  end

  def request_stream_chunked_iodata() do
    IO.puts("Loading data...")
    body = big_json("chunked iodata")
    Process.sleep(@sleep)
    IO.puts("Streaming request body as chunked iodata...")
    {:ok, conn} = Mint.HTTP1.connect(@scheme, @host, @port, @connect_opts)
    {:ok, conn, request_ref} = Mint.HTTP1.request(conn, @method, @path, @headers, :stream)
    chunked = chunk_iodata(body)
    {:ok, conn} = Mint.HTTP1.stream_request_body(conn, request_ref, chunked)
    {:ok, conn} = Mint.HTTP1.stream_request_body(conn, request_ref, :eof)
    receive_response(conn, request_ref)
  end

  def request_stream_chunked_binary() do
    IO.puts("Loading data...")
    body = big_json("chunked binaries")
    Process.sleep(@sleep)
    IO.puts("Streaming request body in binary chunks...")
    {:ok, conn} = Mint.HTTP1.connect(@scheme, @host, @port, @connect_opts)
    {:ok, conn, request_ref} = Mint.HTTP1.request(conn, @method, @path, @headers, :stream)
    chunks = iodata_to_binary_chunks(body, [])

    {:ok, conn} =
      Enum.reduce(chunks, {:ok, conn}, fn chunk, {:ok, conn} ->
        Mint.HTTP1.stream_request_body(conn, request_ref, chunk)
      end)

    {:ok, conn} = Mint.HTTP1.stream_request_body(conn, request_ref, :eof)
    receive_response(conn, request_ref)
  end

  def receive_response(conn, request_ref, result \\ %{}) do
    receive do
      message ->
        case Mint.HTTP1.stream(conn, message) do
          :unknown ->
            IO.inspect(message, label: ">>> unknown message")

          {:ok, conn, responses} ->
            reduced =
              Enum.reduce(responses, result, fn response, result ->
                case response do
                  {:status, _request_ref, status} ->
                    Map.put(result, :status, status)

                  {:headers, _request_ref, headers} ->
                    Map.put(result, :headers, headers)

                  {:data, _request_ref, body_chunk} ->
                    Map.update(result, :body, body_chunk, fn body ->
                      body <> body_chunk
                    end)

                  {:done, _request_ref} ->
                    {:ok, result}

                  {:error, _request_ref, reason} ->
                    {:error, reason}
                end
              end)

            case reduced do
              {:ok, result} -> {:ok, result}
              {:error, reason} -> {:error, reason}
              result -> receive_response(conn, request_ref, result)
            end
        end
    end
  end

  def big_binary(megabytes \\ 10) do
    String.duplicate("-", 1024 * 1024 * megabytes)
  end

  def big_json(foo, megabytes \\ 10) do
    Jason.encode_to_iodata!(%{content: big_binary(megabytes), foo: foo})
  end

  def iodata_to_binary_chunks([], acc) do
    Enum.reverse(acc)
  end

  def iodata_to_binary_chunks(list, acc) do
    case hd(list) do
      number when is_integer(number) ->
        iodata_to_binary_chunks(tl(list), [<<number>> | acc])

      binary when is_binary(binary) ->
        iodata_to_binary_chunks(tl(list), chunk_binary(binary, acc))

      [] ->
        iodata_to_binary_chunks(tl(list), acc)

      nested when is_list(nested) ->
        iodata_to_binary_chunks([hd(nested), tl(nested) | tl(list)], acc)
    end
  end

  @binary_chunk_size 102_400

  def chunk_binary(str, acc) do
    case String.split_at(str, @binary_chunk_size) do
      {slice, ""} -> [slice | acc]
      {slice, rest} -> chunk_binary(rest, [slice | acc])
    end
  end

  def chunk_iodata([]) do
    []
  end

  def chunk_iodata(list) when is_list(list) do
    [chunk_iodata(hd(list)) | chunk_iodata(tl(list))]
  end

  def chunk_iodata(binary) when is_binary(binary) do
    chunk_binary(binary, []) |> Enum.reverse()
  end

  def chunk_iodata(other) do
    other
  end
end

I’m testing the following approaches:

  1. Sending iodata,
  2. Streaming all iodata at once,
  3. Streaming all iodata but chunking big binaries into smaller ones,
  4. Streaming in chunks of binaries: converting iodata to a list of binary chunks where each chunk has a max size limit.

It looks like only the last approach avoids memory spikes. Since it seems the problem only shows up with HTTPS and not HTTP, I’m guessing this has something to do with how the data is encrypted by the :ssl module. My working theory is that the last approach fits nicely with the encryption algorithm. But I’d prefer for the chunking to happen somewhere in the library code, not mine. Otherwise it feels like the solution is a bit magical and quite fragile.

@ericmj @whatyouhide @voltone I’d appreciate if you could chime in on this.

4 Likes

I highly doubt this is in any way related to ssl, as symmetric encryption used on the line creates very little overhead, it is more likely the overhead introduced by the json format and the base64 encoding, or maybe both. If you have possibility of changing the format of data, it would be much better to use a mechanism like multipart requests and ditch base64 entirely, because while your solution might work, I’m not sure on how easy it integrates with tools that collect telemetry, or how debugable should something go wrong.

Doesn’t seem like this has anything to do with JSON or base64 encoding. I’ve changed the script to POST plain text and the results are the same (the original script already worked with iodata and the content was pretty much irrelevant at that point).

Could the TLS support be flattening the iolist into a single binary causing a large allocation?

I traced what Mint is doing and dropped down one level to :ssl and I consistently get the same results:

defmodule SslTest do
  def run() do
    host = "localhost"
    port = 4040
    method = "POST"
    path = "/test/text"
    content_length = 10 * 1024 * 1024
    ssl_opts = [verify: :verify_none]

    body = String.duplicate("-", content_length)
    chunks = [body]
    #chunks = chunk_binary(body, 1024 * 10)

    IO.puts("waiting 3 seconds...")
    Process.sleep(3000)

    headers = [
      {"host", "#{host}:#{port}"},
      {"content-type", "text/plain"},
      {"content-length", content_length}
    ]

    {:ok, socket} =
      :ssl.connect(
        String.to_charlist(host),
        port,
        ssl_opts(host, ssl_opts)
      )

    request = [
      "#{method} #{path} HTTP/1.1\r\n",
      Enum.map(headers, fn {name, value} -> "#{name}: #{value}\r\n" end),
      "\r\n"
    ]

    :ok = :ssl.send(socket, request)

    for chunk <- chunks do
      :ok = :ssl.send(socket, chunk)
    end

    Process.sleep(500)
    {:ok, response} = :ssl.recv(socket, 0, 0)
    :ok = :ssl.close(socket)

    IO.puts(response)
  end

  def ssl_opts(hostname, opts) do
    [
      server_name_indication: String.to_charlist(hostname),
      versions: [:"tlsv1.3", :"tlsv1.2"],
      depth: 4,
      secure_renegotiate: true,
      reuse_sessions: true,
      packet: :raw,
      mode: :binary,
      active: false,
      ciphers: [
        %{cipher: :aes_256_gcm, key_exchange: :any, mac: :aead, prf: :sha384},
        %{cipher: :aes_128_gcm, key_exchange: :any, mac: :aead, prf: :sha256},
        %{cipher: :chacha20_poly1305, key_exchange: :any, mac: :aead, prf: :sha256},
        %{cipher: :aes_128_ccm, key_exchange: :any, mac: :aead, prf: :sha256},
        %{cipher: :aes_128_ccm_8, key_exchange: :any, mac: :aead, prf: :sha256},
        %{
          cipher: :aes_256_gcm,
          key_exchange: :ecdhe_ecdsa,
          mac: :aead,
          prf: :sha384
        },
        %{cipher: :aes_256_gcm, key_exchange: :ecdhe_rsa, mac: :aead, prf: :sha384},
        %{
          cipher: :aes_256_ccm,
          key_exchange: :ecdhe_ecdsa,
          mac: :aead,
          prf: :default_prf
        },
        %{
          cipher: :aes_256_ccm_8,
          key_exchange: :ecdhe_ecdsa,
          mac: :aead,
          prf: :default_prf
        },
        %{
          cipher: :chacha20_poly1305,
          key_exchange: :ecdhe_ecdsa,
          mac: :aead,
          prf: :sha256
        },
        %{
          cipher: :chacha20_poly1305,
          key_exchange: :ecdhe_rsa,
          mac: :aead,
          prf: :sha256
        },
        %{
          cipher: :aes_128_gcm,
          key_exchange: :ecdhe_ecdsa,
          mac: :aead,
          prf: :sha256
        },
        %{cipher: :aes_128_gcm, key_exchange: :ecdhe_rsa, mac: :aead, prf: :sha256},
        %{
          cipher: :aes_128_ccm,
          key_exchange: :ecdhe_ecdsa,
          mac: :aead,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_ccm_8,
          key_exchange: :ecdhe_ecdsa,
          mac: :aead,
          prf: :default_prf
        },
        %{cipher: :aes_256_gcm, key_exchange: :dhe_rsa, mac: :aead, prf: :sha384},
        %{cipher: :aes_256_gcm, key_exchange: :dhe_dss, mac: :aead, prf: :sha384},
        %{
          cipher: :aes_256_cbc,
          key_exchange: :dhe_rsa,
          mac: :sha256,
          prf: :default_prf
        },
        %{
          cipher: :aes_256_cbc,
          key_exchange: :dhe_dss,
          mac: :sha256,
          prf: :default_prf
        },
        %{cipher: :aes_128_gcm, key_exchange: :dhe_rsa, mac: :aead, prf: :sha256},
        %{cipher: :aes_128_gcm, key_exchange: :dhe_dss, mac: :aead, prf: :sha256},
        %{
          cipher: :chacha20_poly1305,
          key_exchange: :dhe_rsa,
          mac: :aead,
          prf: :sha256
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :dhe_rsa,
          mac: :sha256,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :dhe_dss,
          mac: :sha256,
          prf: :default_prf
        },
        %{
          cipher: :aes_256_cbc,
          key_exchange: :ecdhe_ecdsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_256_cbc,
          key_exchange: :ecdhe_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_256_cbc,
          key_exchange: :ecdh_ecdsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_256_cbc,
          key_exchange: :ecdh_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :ecdhe_ecdsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :ecdhe_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :ecdh_ecdsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :ecdh_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_256_cbc,
          key_exchange: :dhe_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_256_cbc,
          key_exchange: :dhe_dss,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :dhe_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :dhe_dss,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :rsa_psk,
          mac: :sha256,
          prf: :default_prf
        },
        %{
          cipher: :aes_256_cbc,
          key_exchange: :rsa_psk,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :rsa_psk,
          mac: :sha,
          prf: :default_prf
        },
        %{cipher: :rc4_128, key_exchange: :rsa_psk, mac: :sha, prf: :default_prf},
        %{
          cipher: :aes_256_cbc,
          key_exchange: :srp_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_256_cbc,
          key_exchange: :srp_dss,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :srp_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :aes_128_cbc,
          key_exchange: :srp_dss,
          mac: :sha,
          prf: :default_prf
        },
        %{cipher: :aes_256_cbc, key_exchange: :rsa, mac: :sha256, prf: :default_prf},
        %{cipher: :aes_128_cbc, key_exchange: :rsa, mac: :sha256, prf: :default_prf},
        %{cipher: :aes_256_cbc, key_exchange: :rsa, mac: :sha, prf: :default_prf},
        %{cipher: :aes_128_cbc, key_exchange: :rsa, mac: :sha, prf: :default_prf},
        %{cipher: :"3des_ede_cbc", key_exchange: :rsa, mac: :sha, prf: :default_prf},
        %{
          cipher: :"3des_ede_cbc",
          key_exchange: :ecdhe_ecdsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :"3des_ede_cbc",
          key_exchange: :ecdhe_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :"3des_ede_cbc",
          key_exchange: :dhe_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :"3des_ede_cbc",
          key_exchange: :dhe_dss,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :"3des_ede_cbc",
          key_exchange: :ecdh_ecdsa,
          mac: :sha,
          prf: :default_prf
        },
        %{
          cipher: :"3des_ede_cbc",
          key_exchange: :ecdh_rsa,
          mac: :sha,
          prf: :default_prf
        },
        %{cipher: :des_cbc, key_exchange: :dhe_rsa, mac: :sha, prf: :default_prf},
        %{cipher: :des_cbc, key_exchange: :rsa, mac: :sha, prf: :default_prf},
        %{
          cipher: :rc4_128,
          key_exchange: :ecdhe_ecdsa,
          mac: :sha,
          prf: :default_prf
        },
        %{cipher: :rc4_128, key_exchange: :ecdhe_rsa, mac: :sha, prf: :default_prf},
        %{cipher: :rc4_128, key_exchange: :ecdh_ecdsa, mac: :sha, prf: :default_prf},
        %{cipher: :rc4_128, key_exchange: :ecdh_rsa, mac: :sha, prf: :default_prf},
        %{cipher: :rc4_128, key_exchange: :rsa, mac: :sha, prf: :default_prf},
        %{cipher: :rc4_128, key_exchange: :rsa, mac: :md5, prf: :default_prf}
      ]
    ]
    |> Keyword.merge(opts)
  end

  def chunk_binary(str, size \\ 102_400, acc \\ []) do
    case String.split_at(str, size) do
      {slice, ""} -> Enum.reverse([slice | acc])
      {slice, rest} -> chunk_binary(rest, size, [slice | acc])
    end
  end
end

If you uncomment the line with the call to chunk_binary the memory spike disappears.

Interesting. One thing in the ssl application that caught my eye was the tls_record:encode_data function. Consider the following:

> body = String.duplicate("-", 10 * 1024 * 1024)
> :erlang.memory
[
  total: 44051952,
  processes: 5186904,
  processes_used: 5186032,
  system: 38865048,
  atom: 336049,
  atom_used: 327632,
  binary: 10548400,
  code: 8012598,
  ets: 454184
]
> :tls_record.encode_data([body], {3, 0}, %{:current_write => %{:beast_mitigation => :one_n_minus_one, :max_fragment_length => :undefined, :security_parameters => :ssl_record.initial_security_params(1)}})
...
> :erlang.memory
[
  total: 46457456,
  processes: 4910424,
  processes_used: 4909512,
  system: 41547032,
  atom: 442553,
  atom_used: 418066,
  binary: 11842480,
  code: 8318166,
  ets: 466312
]

That seems to create a 2.5 MB spike even though the function doesn’t complete successfully. Now when applying the chunking, the delta doesn’t occur when calling that function (although the total memory seems generally higher, ~50MB instead of ~44MB in my shell). I also called the :tls_record.encode_data function without wrapping the chunked binary in a list. Not scientific but could be a good place to dig in further. Changing beast_mitigation to disabled didn’t seem to help, either.

Thanks, I’ll dig into that! FYI I cross-posted this to the Erlang forum too.

Suggestion: Add a link from the Erlang Forum thread to this thread. This allows people to read both threads and avoid duplicate work.

1 Like