Hi All,
I am prototyping a system using Broadway to process SQS messages and then using Flow to process each message. Each message being an S3 path of a file that is substantially large, around 200 to 400 MB compressed gzip, expanding to typically around 1 to 2 GB of text data.
I’m using a few very large instances to process this data but unable to get Elixir to use all of the CPU. Each worker is using its own hackney pool, so they should not be competing. Here’s what I have configured for the default pool:
follow_redirect: true,
# Must be longer than long-poll setting for SQS (10 seconds)
recv_timeout: 30_000,
timeout: 300_000,
checkout_timeout: 30_000,
max_connections: 50
I am using the following Broadway configuration (though I have tried many variants of this):
Broadway.start_link(__MODULE__,
name: __MODULE__,
producers: [
default: [
module:
{BroadwaySQS.Producer,
queue_name: "my-queue",
max_number_of_messages: 10,
wait_time_seconds: 4,
visibility_timeout: 300,
receive_interval: 10},
stages: 5
]
],
processors: [
default: [
max_demand: 1,
stages: System.schedulers_online
]
],
batchers: [
default: [
batch_size: 10,
batch_timeout: 10_000,
stages: 5
]
]
)
I have the following in my rel/vm.args
to try to open up any VM level IO blockage, without any luck.
## Enable kernel poll and a few async threads
##+K true
+A 1024
## For OTP21+, the +A flag is not used anymore,
## +SDio replace it to use dirty schedulers
+SDio 1024
# Allow up to 10 million processes
+P 10000000
## Increase number of concurrent ports/sockets
-env ERL_MAX_PORTS 4096
I seem to have more luck in some areas with presigned URLs than using ExAws
but neither perform where I’d like.
Here is a screenshot of CPU utilization for 3 instances. I’ve tried creating more processes and I’ve tried on just a single instance and I can never really get it above 60-75 max CPU.
For reference, here is my presigned URL processing function which produced the above graph:
(NOTE: I’m only downloading to disk because with streaming to memory it performs worse (0-1% CPU) using the code I wrote in this PR, though I can find nothing wrong with it)
@read_chunk_size 1024 * 1024 * 10
def process("https://" <> _ = presigned_url, pool_name) do
temp_filename = "/tmp/#{pool_name}.tmp"
_ = File.rm(temp_filename) # Ok if it doesn't exist
{microseconds, _} =
:timer.tc(fn ->
Download.from(presigned_url, path: temp_filename, http_opts: [hackney: [pool: pool_name]])
end)
_ = Logger.info("File #{temp_filename} downloaded in #{microseconds / 1_000_000} seconds")
result =
temp_filename
|> File.stream!([], @read_chunk_size)
|> file_processing_stream()
|> reduce_results()
|> Enum.into(%{})
File.rm!(temp_filename)
result
end
Download times also go up significantly when under load, from 3-5 seconds to double or triple that under load.