I was using Task.Supervisor.async_stream_nolink/4
to perform an operation on each object in an S3 bucket. This worked for about 12 hours and then my AWS token expired. Not a big deal… I needed a local CSV file to list the keys anyway, so I used the same Task.Supervisor.async_stream_nolink/4
pattern to grab all the keys and create an index file.
However, when I tried to use Task.Supervisor.async_stream_nolink/4
on the CSV and use File.stream!/2
to process that file, suddenly, the stream times out. When I was streaming files from S3, the timeout applied (I thought) to each S3 operation I was performing. When I was streaming lines from a local file, the timeout seems to indicate the time allowed for the entire stream to complete. I think I’m missing something obvious and I’m hoping someone can point out where I went off the rails.
The initial stream (the one that was working) was structured like this (and yes, it more or less does the same as an aws sync
command):
def cp_local(src_prefix, target_dir, max_concurrency \\ 1000) do
bucket = "my-bucket"
File.mkdir_p!(target_dir)
Task.Supervisor.async_stream_nolink(
MyApp.TaskSupervisor,
bucket
|> ExAws.S3.list_objects_v2(prefix: src_prefix)
|> ExAws.stream!(),
fn %{key: key} ->
case get_object(bucket, key) do
{:ok, %{body: body}} ->
File.write!(target_dir <> Path.basename(key), body)
{:error, error} ->
Logger.error(error)
end
end,
timeout: 40_000,
max_concurrency: max_concurrency
)
|> Stream.run()
end
That ran for hours – the :timeout
option really seemed to affect each individual s3 op.
The 2nd variant looked like this:
def cp_from_index(index_file, target_dir, max_concurrency \\ 1000) do
bucket = "my-bucket"
File.mkdir_p!(target_dir)
IO.puts("Starting at: #{DateTime.utc_now()}")
Task.Supervisor.async_stream_nolink(
MyApp.TaskSupervisor,
File.stream!(index_file),
fn line ->
key = String.trim(line)
case get_object(bucket, key) do
{:ok, %{body: body}} ->
File.write!(target_dir <> Path.basename(key), body)
{:error, error} ->
Logger.error(error)
end
end,
timeout: 40_000,
max_concurrency: max_concurrency
)
|> Stream.run()
IO.puts("Finishing at: #{DateTime.utc_now()}")
end
And as soon as it ran for 40 seconds (the timeout), it stopped. What did I miss?
Thanks in advance!