Is Elixir suitable for downloading multiple files simultaneously?

I have an app that downloads lots of large files from S3 throughout the day. I’m using ibrowse and streaming the files to disk. This works well, but cpu usage is pretty high when 5+ files are downloading at once. I know elixir isn’t the best choice for computation heavy tasks, but am I running into the same limitation with file downloads in the sense that the constant stream of messages overloads the cpu?

What are you using to download? I use gen_tcp directly and can ingest 100MB/s easily on low end PC, although I buffer the IO a lot. It’s definitely something with your download logic.

2 Likes

I’m using ibrowse with async requests. Here’s what the downloader module looks like. It’s called inside Task.async_stream.

defmodule IbrowseDownloader do
  require Logger
  alias Downloader.Utils

  @doc false
  def download(url, destination, filename, request_headers) do
    config = %{
        url: url,
        destination: destination,
        filename: filename,
        request_headers: request_headers,
        id: nil,
        file: nil,
      }
    with {:ibrowse_req_id, id} <- send_request(config),
         {:ok, file} <- handle_response(%{config | id: id}) do
      {:ok, file}
    end
  end

  @doc false
  def send_request(config) do
    config.url
    |> to_charlist()
    |> :ibrowse.send_req(config.request_headers, :get, [], [
      {:stream_to, self()},
      {:connect_timeout, 60_000},
      {:inactivity_timeout, 60_000},
      {:max_sessions, 1000},
      {:max_pipeline_size, 100_000},
    ], 120 * 60 * 1000)
  end

  @doc false
  def handle_response(config) do
    id = config.id
    receive do
      {:ibrowse_async_headers, ^id, '200', headers} ->
        Logger.debug("Received 200 status")
        {:ok, filename} = Utils.filename(headers, config.url, config.filename)
        {:ok, file} = Utils.create_file(config.destination, filename)
        handle_response(%{config | filename: filename, file: file})

      {:ibrowse_async_headers, _, '302', headers} ->
        Logger.debug("Received 302 status")
        [{'Location', redirect_url}] = Enum.filter(headers, fn({name, _value}) -> name == 'Location' end)
        {:ibrowse_req_id, id} = send_request(%{config | url: redirect_url})
        handle_response(%{config | url: redirect_url, id: id})

      {:ibrowse_async_headers, ^id, status, _headers} ->
        Logger.debug("Received #{status} status")
        {:error, status}

      {:ibrowse_async_response, ^id, {:error, :connection_closed}} ->
        Logger.error("Received :connection_closed")
        {:error, :connection_closed}

      {:ibrowse_async_response, ^id, {:error, :req_timedout}} ->
        Logger.error("Received :req_timedout")
        {:error, :req_timedout}

      {:ibrowse_async_response_timeout, ^id} ->
        Logger.error("Received ibrowse_async_response_timeout")
        {:error, :timeout}

      {:ibrowse_async_response, ^id, chunk} ->
        # Logger.debug("Received chunk. size #{length(chunk)}")
        IO.binwrite(config.file, chunk)
        handle_response(config)

      {:ibrowse_async_response_end, ^id} ->
        Logger.debug("Received end")
        File.close(config.file)
        file = Path.join([config.destination, config.filename])
        {:ok, file}
    end
  end

end
1 Like

Is the result of IbrowseDownloader.download/4 the entire file contents? And are you sending those back to the process that originated the task before storing it to disk? If so that’s a lot of unnecessary messaging overhead and you’d be better off doing the entire “task” within a the Task. So download and store to disk all within the same task as one unit of work. If that’s not it, then it would be helpful to see your code that creates the tasks.

1 Like

The return result is just an {:ok, file} tuple containing the full path to the downloaded file. The entire body of work is being done inside Task.async_stream. This is the code that initiates the downloads.

Task.async_stream(files, fn {url, destination, filename} ->
  IbrowseDownloader.download(url, destination, filename, [@auth_header])
end, timeout: :infinity, max_concurrency: 20)
|> Enum.to_list()

I’ve never used ibrowse before. Can you log how often you are processing response? I think you need better rate control over how often you are writing to disk, the buffer size depends on hardware but in general try 64KB or larger. ibrowse should have something to let you control stream rate because gen_tcp allows that, like :inet.setopts(state.socket, active: 1)

I also haven’t used ibrowse but I’ve used httpotion several times with Task.async_stream and have been able to download 200 files sumultaneously for hours at a time (and store them to an NFS volume, all on a small VPS: 256MB of RAM) and when me and a colleague watched it remotely with :observer the CPU was getting very slightly excited – 7-8% – with rare spikes to 15% (I am guessing garbage collector kicking in).

But I will agree with @xlphs – when in doubt about if the network is causing you problems, always reach for :gen_tcp first. It gives you 99% clear experience and if everything works well in that code then you either keep it and use it, or start making another module that uses a higher-level library and gradually isolate the problem.

2 Likes

I found what was causing my issues. One of the async responses I was getting was {:ibrowse_async_response, id, {:error, :req_timedout}} which matched the {:ibrowse_async_response, ^id, chunk} clause and thus called IO.binwrite with {:error, :req_timedout}. Once I added a clause to handle this and return an error tuple instead of calling IO.binwrite, cpu usage has dropped drastically. Thanks for the help everyone!

2 Likes

Nice catch! Good job. Glad you solved it.