Stream times out: Task.Supervisor.async_stream_nolink/4 on a CSV with File.stream!/2 to process that file

I was using Task.Supervisor.async_stream_nolink/4 to perform an operation on each object in an S3 bucket. This worked for about 12 hours and then my AWS token expired. Not a big deal… I needed a local CSV file to list the keys anyway, so I used the same Task.Supervisor.async_stream_nolink/4 pattern to grab all the keys and create an index file.

However, when I tried to use Task.Supervisor.async_stream_nolink/4 on the CSV and use File.stream!/2 to process that file, suddenly, the stream times out. When I was streaming files from S3, the timeout applied (I thought) to each S3 operation I was performing. When I was streaming lines from a local file, the timeout seems to indicate the time allowed for the entire stream to complete. I think I’m missing something obvious and I’m hoping someone can point out where I went off the rails.

The initial stream (the one that was working) was structured like this (and yes, it more or less does the same as an aws sync command):

def cp_local(src_prefix, target_dir, max_concurrency \\ 1000) do
    bucket = "my-bucket"
    File.mkdir_p!(target_dir)




    Task.Supervisor.async_stream_nolink(
      MyApp.TaskSupervisor,
      bucket
      |> ExAws.S3.list_objects_v2(prefix: src_prefix)
      |> ExAws.stream!(),
      fn %{key: key} ->

        case get_object(bucket, key) do
          {:ok, %{body: body}} ->
            File.write!(target_dir <> Path.basename(key), body)

          {:error, error} ->
            Logger.error(error)
        end
      end,
      timeout: 40_000,
      max_concurrency: max_concurrency
    )
    |> Stream.run()
  end

That ran for hours – the :timeout option really seemed to affect each individual s3 op.

The 2nd variant looked like this:

  def cp_from_index(index_file, target_dir, max_concurrency \\ 1000) do
    bucket = "my-bucket"
    File.mkdir_p!(target_dir)
    IO.puts("Starting at: #{DateTime.utc_now()}")

    Task.Supervisor.async_stream_nolink(
      MyApp.TaskSupervisor,
      File.stream!(index_file),
      fn line ->
        key = String.trim(line)

        case get_object(bucket, key) do
          {:ok, %{body: body}} ->
            File.write!(target_dir <> Path.basename(key), body)

          {:error, error} ->
            Logger.error(error)
        end
      end,
      timeout: 40_000,
      max_concurrency: max_concurrency
    )
    |> Stream.run()

    IO.puts("Finishing at: #{DateTime.utc_now()}")
  end

And as soon as it ran for 40 seconds (the timeout), it stopped. What did I miss?

Thanks in advance!

1 Like

The timeout is enforced per-task here:

However, the default setting for on_timeout in async_stream functions is :exit - so if one of the max_concurrency tasks times out after 40s, the process that called cp_from_index will exit.

You likely want the alternative :kill_task value for on_timeout, which will keep things going and put a {:error, :timeout} in the result.

2 Likes

Thank you Matt for your sage advice. Appreciated as always.

Any idea why this might cause the shell process to crash?

iex(jester@ip-10-0-33-167)2> MyApp. cp_from_index("/path/to/s3_keys.csv", "/target/dir/",  5000)
Starting at: 2023-01-13 11:33:44.183873Z
*** ERROR: Shell process terminated! (^G to start new job) ***

11:45:52.782 [error] ** Node :"my-app@ip-10-0-33-167" not responding **
** Removing (timedout) connection **

I put in a line that touches a file so I can see that the underlying Erlang process is still running, but I didn’t see any errors anywhere (if encountered, I wrote errors to a separate file)>