IO Bottlenecked Processing

Hi All,

I am prototyping a system using Broadway to process SQS messages and then using Flow to process each message. Each message being an S3 path of a file that is substantially large, around 200 to 400 MB compressed gzip, expanding to typically around 1 to 2 GB of text data.

I’m using a few very large instances to process this data but unable to get Elixir to use all of the CPU. Each worker is using its own hackney pool, so they should not be competing. Here’s what I have configured for the default pool:

  follow_redirect: true,
  # Must be longer than long-poll setting for SQS (10 seconds)
  recv_timeout: 30_000,
  timeout: 300_000,
  checkout_timeout: 30_000,
  max_connections: 50

I am using the following Broadway configuration (though I have tried many variants of this):

Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producers: [
        default: [
          module:
            {BroadwaySQS.Producer,
             queue_name: "my-queue",
             max_number_of_messages: 10,
             wait_time_seconds: 4,
             visibility_timeout: 300,
             receive_interval: 10},
          stages: 5
        ]
      ],
      processors: [
        default: [
          max_demand: 1,
          stages: System.schedulers_online
        ]
      ],
      batchers: [
        default: [
          batch_size: 10,
          batch_timeout: 10_000,
          stages: 5
        ]
      ]
    )

I have the following in my rel/vm.args to try to open up any VM level IO blockage, without any luck.

## Enable kernel poll and a few async threads
##+K true
+A 1024
## For OTP21+, the +A flag is not used anymore,
## +SDio replace it to use dirty schedulers
+SDio 1024
# Allow up to 10 million processes
+P 10000000

## Increase number of concurrent ports/sockets
-env ERL_MAX_PORTS 4096

I seem to have more luck in some areas with presigned URLs than using ExAws but neither perform where I’d like.

Here is a screenshot of CPU utilization for 3 instances. I’ve tried creating more processes and I’ve tried on just a single instance and I can never really get it above 60-75 max CPU.

For reference, here is my presigned URL processing function which produced the above graph:
(NOTE: I’m only downloading to disk because with streaming to memory it performs worse (0-1% CPU) using the code I wrote in this PR, though I can find nothing wrong with it)

  @read_chunk_size 1024 * 1024 * 10
  def process("https://" <> _ = presigned_url, pool_name) do
    temp_filename = "/tmp/#{pool_name}.tmp"
    _ = File.rm(temp_filename) # Ok if it doesn't exist
    {microseconds, _} =
      :timer.tc(fn ->
        Download.from(presigned_url, path: temp_filename, http_opts: [hackney: [pool: pool_name]])
      end)

    _ = Logger.info("File #{temp_filename} downloaded in #{microseconds / 1_000_000} seconds")

    result =
      temp_filename
      |> File.stream!([], @read_chunk_size)
      |> file_processing_stream()
      |> reduce_results()
      |> Enum.into(%{})

    File.rm!(temp_filename)

    result
  end

Download times also go up significantly when under load, from 3-5 seconds to double or triple that under load.

Whats left that I haven’t tried?

What kind of throughput are you getting? How does that compare to what you expect?

You can see it ran for over 10 minutes from the graphs. It should be way faster than that. Its not using the CPU enough. The instance I’m using has a 25gbit network link, best available. Using wget I can get a file at about 100mb/s, so 2-4 seconds.

I had trouble getting metrics out of hackney, when I add metrics hackney doesn’t start up when built in a release (works fine otherwise). https://github.com/benoitc/hackney/issues/567

I would like to see how many connections are opened during load to determine how the 25gbit is split. Doing some math, the configuration I have only leaves 32 mb/s of bandwidth per worker, so maybe that explains why it takes 10-15 seconds to fetch files. I’m going to give it a try with half the number of workers, that should give me 64-65 mb/s bandwidth per worker. I’ll see if it gives me any better speed improvements.

What are the specs of each instance?

m5.24xlarge, 96 cores 384gb ram, 25 gigabit network link

Do you mean 100MB/s? Because at 100mb/s it should take much longer than 2-4 seconds per file.

Do you mean 32MB/s? Because at 32mb/s, it should take much longer than 10-15 seconds per file.

I’m not trying to be argumentative above. It’s just that so far, unless I’ve missed something, you haven’t provided some pretty key numbers that would let us figure out what bandwidth you’re actually using. Your graphs & numbers don’t tell us how many processes you’re running, nor how many files you’re transferring in the times shown. It would be easiest to answer my question if we just had a graph of network I/O, but lacking that, if we had other numbers we could estimate.

I’m asking because so far there’s nothing in your posts that eliminates bandwidth as the limiting factor instead of CPU. It seems perfectly possible that you’re saturating network or disk I/O at less than 100% CPU usage. And speaking of disk, what are you using? Something that can sustain 2.5gB/s?

2 Likes

I’d definitely start by isolating the various parts of the system. Instead of reading messages from SQS, start with a pre-defined list and benchmark how long it takes to work through that.

You’re processing each file as far as I can tell 1 per CPU, this is going to have sub optimal download performance because S3 download speed per path isn’t particularly high. If you want to max that out you want multiple concurrent downloaders per path using something like https://hexdocs.pm/ex_aws_s3/ExAws.S3.html#download_file/4

2 Likes

That number is what it showed when I tried fetching files via wget, so I assume its megabytes.

Sorry thats 25 gigabit / 96 cores, so yeah 32 megabits.

Didn’t realize I had it, but here is the network in from cloudwatch.

It shouldn’t be, heres a modified version of my processing with the logic parts (maps & reduce) removed.

    temp_file
    |> File.stream!([], 1024 * 1024 * 10)
    |> StreamGzip.gunzip()
    |> Stream.concat([:end])
    |> Stream.transform("", fn
      :end, prev ->
        {[prev], ""}

      chunk, prev ->
        [last_line | lines] =
          String.split(prev <> chunk, "\n")
          |> Enum.reverse()

        {Enum.reverse(lines), last_line}
    end)
    |> Flow.from_enumerable(stages: 50)
    |> Flow.map(&parse_record/1)
    |> Flow.partition(stages: 1)
    |> Flow.reduce(fn -> %{} end, fn record, map ->
      # aggregate into map
    end)

I think you’re pretty much saturating your network connection.

Taking @benwilson512’s approach, I figured out one bug, wasn’t in “my” code though. Using SQS send-message instead of send-message-batch meant I wasn’t delivering work fast enough. Once that cleared up, I’m still bottlenecked at 70% CPU using ExAws (going to try soon with presigned links to see if its any better).

I don’t think so. Theres plenty of network bandwidth left from what I can tell.

25,000,000,000 bits per second / 8 = 3,125,000,000 bytes per second
3,125,000,000 bytes per second * 60 seconds = 187,500,000,000 bytes per minute.

Here’s running on just one machine, there is 185GB of input so it should be able to download it all within 1 minute according to the above calculation.

See the CPU is up now (all percentiles are on top of each other), but NetworkIn is still super low, topping out at 22GB. Is there something I’m missing or is that calculation wrong for some reason?

Your math there looks right–so you’re saying you see a peak of 22GB/min? I think I got confused in your prior post re gB/min vs gb/sec.

I’m confused by all the G/gb/GB mb/MB etc being thrown around… maybe just stick to one unit, or make sure they are used correctly…

Just wanted to point out that storage speed might be a/the bottleneck as well… the m5.24xlarge have EBS instance storage, so since this instance has plenty of memory, maybe go straight to memory and not to disk/EBS…

(even if you use an instance with dedicated ssd, I doubt it can hold up to ingressing 25 Gbit straight to disk…)

3 Likes