Upload to S3 from Google Drive by downloading the file in chunks and uploading it to S3 at the same time

Hello everyone.

I’m working on an application that lists all your Google Drive files and allows you to upload them to our app (which uses S3).

Unfortunately, it’s not possible to use a file’s URL given by Drive to upload the file using waffle_ecto with opt allow_urls: true. The only way is to download the file first to a temporary folder and then uploading it to S3; which it worked, but is it possible to avoid using a temporary folder and upload the file at the same time that is being downloaded?

Well, I’ve tried to do it but I failed. The async_download/2 function was extracted from [{poeticoding}] (Download Large Files with HTTPoison Async Requests) by the author @alvises and it works perfect. I believe the problem is in the chunk param when I call ExAws.S3.put_object/3 inside the upload_chunk_to_s3/2 function. Currently, the file type of chunk is an iodata. I’ve also tried to transform it to a binary using IO.iodata_to_binary/2 and encoding to base64 with Base.encode64/2 but in both cases I failed.

Here is the code involved:

defmodule GoogleDriveApi do
  @moduledoc """
  Google Drive API module.
  """

  @base_url "https://www.googleapis.com/drive/v3"

  @doc """
  Downloads a single file from Google Drive API.
  """
  def download_file(access_token, file, filename) do
    url = @base_url <> "/files/" <> file["id"]

    headers = [
      Authorization: "Bearer #{access_token}",
      Accept: "Application/json; Charset=utf-8"
    ]

    options = [
      params: [
        alt: "media"
      ],
      stream_to: self(),
      async: :once
    ]

    with {:ok, resp} <- HTTPoison.get(url, headers, options),
         :ok <- async_download(resp, filename) do
      {:ok, ""}
    end
  end

  defp async_download(resp, filename) do
    resp_id = resp.id

    receive do
      %HTTPoison.AsyncStatus{code: 200, id: ^resp_id} ->
        HTTPoison.stream_next(resp)
        async_download(resp, filename)

      %HTTPoison.AsyncStatus{code: status_code, id: ^resp_id} ->
        IO.inspect(status_code)

      %HTTPoison.AsyncHeaders{headers: _headers, id: ^resp_id} ->
        HTTPoison.stream_next(resp)
        async_download(resp, filename)

      %HTTPoison.AsyncChunk{chunk: chunk, id: ^resp_id} ->
        upload_chunk_to_s3(filename, chunk)

        HTTPoison.stream_next(resp)

        async_download(resp, filename)

      %HTTPoison.AsyncEnd{id: ^resp_id} ->
        :ok
    end
  end

  defp upload_chunk_to_s3(filename, chunk) do
    IO.puts "UPLOAD CHUNK TO S3"

    path_to_s3 = "tmp/" <> filename
    
    get_s3_bucket()
    |> ExAws.S3.put_object(path_to_s3, chunk)
    |> ExAws.request!
  end  
end

One thing I forgot to mention was I saw some issues on Stackoverflow that could solve this by using Javascript and Buffers. Maybe it’s not possible from the Server side and I have to try it from the Client side?

Any advice would be appreciated!

Cheers,
Agustín Silva.

2 Likes

put_object definitely expects a binary in that position (it will try to JSON-encode an array), so iodata_to_binary would be necessary.

Can you describe what “failed” in that case in more detail?

One thing I’m not sure about: put_object does not append, it overwrites the object with the new data. But upload_chunk_to_s3 always uses the same value in path_to_s3 :thinking:

1 Like

The fail was that I couldn’t upload the whole file to S3 while it was being downloaded. For example, when I’ve tried to download a file of 2.7 MB, the file uploaded to S3 was like 1.7 KB. Seems that it only worked on the first call of ExAws.S3.put_object/3.

Based on this Stackoverflow post:

S3 doesn’t have an “append” operation.* Once an object has been uploaded, there is no way to modify it in place; your only option is to upload a new object to replace it.

So, the only way is to store it in a temporary directory in the Server and then uploading it to S3, right?

You need to use S3 multipart upload but pay attention to the maximum chunk count and minimum chunk size.

I solved the problem by generating a state token which is continuously exchanged for part signatures (incrementally, to allow client uploads).

Should be easier if you were doing it server-side.

https://docs.aws.amazon.com/AmazonS3/latest/userguide/mpuoverview.html

2 Likes

I’ve made some progress. If files are <= 5 MB then I parse the iodata to binary with IO.iodata_to_binary/2 and pass it to put_object. This works fine. Only thing that remains is to upload files greater than 5 MB using Multi part upload.

def download_file(access_token, file, filename) do
    url = @base_url <> "/files/" <> file["id"]

    headers = set_headers(access_token)

    options = [
      params: [
        alt: "media"
      ]
    ]

    if String.to_integer(file["size"]) > @filesize_limit_in_bytes do
      async_download(url, headers, options, filename)
    else
      sync_download(url, headers, options)
    end
  end

  def async_download(url, headers, options, filename) do
    options = options ++ [stream_to: self(), async: :once]

    with {:ok, resp} <- HTTPoison.get(url, headers, options) do
      do_async_download(resp, filename)
    end
  end

  def sync_download(url, headers, options) do
    {:ok, body} = get(url, headers, options)
    {:ok, IO.iodata_to_binary(body)}
  end

  defp set_headers(access_token) do
    [
      Authorization: "Bearer #{access_token}",
      Accept: "Application/json; Charset=utf-8"
    ]
  end

  defp do_async_download(resp, filename) do
    resp_id = resp.id

    receive do
      %HTTPoison.AsyncStatus{code: status_code, id: ^resp_id} ->
        IO.inspect(status_code)
        HTTPoison.stream_next(resp)
        do_async_download(resp, filename)

      %HTTPoison.AsyncHeaders{headers: headers, id: ^resp_id} ->
        IO.inspect(headers)
        HTTPoison.stream_next(resp)
        do_async_download(resp, filename)

      %HTTPoison.AsyncChunk{chunk: chunk, id: ^resp_id} ->
        #upload_chunk_to_s3(filename, chunk)
        HTTPoison.stream_next(resp)
        do_async_download(resp, filename)

      %HTTPoison.AsyncEnd{id: ^resp_id} ->
        {:ok, ""}
    end
  end
1 Like

Thanks for the advice.

Did you use ExAws for that or you just had to do it manually with some HTTP client? Also, I noticed that you mentioned the same here Packmatic — On-the-fly Zip Generation. Do you have the code somewhere in the Packmatic dep? So I can have a look at it.

Hi

  1. My chunked copier is implemented directly on top of ibrowse

  2. Yeah. packmatic/url.ex at develop · evadne/packmatic · GitHub but keep in mind it does not expose a stream.

2 Likes

How were you able to handle multipart upload?

Hi Agustin, did you ever solve your problem? I’m currently stuck at the exact same spot, turning the tiny chunks from HTTPoison into a stream that works for S3 (also from gdrive).

Cheers!
Marcel

@andrewdesmondm @marcelfahle I’ve ended up combining the solution for downloading large files from Poeticoding and the ExAws dependency (see ExAws.S3.initiate_multipart_upload).

1 Like

Thanks Agustin! I ended up doing something very similar, if not the same. It works pretty well. I threw together a quick demo video:

1 Like
objects = ExAws.S3.list_objects_v2(bucket, prefix: prefix)
|> ExAws.stream!()
|> Stream.reject(fn %{key: key} ->
  String.split(key, "/")
  |> Enum.any?(& &1 == "report")
end)
|> Stream.map(fn %{key: key} ->
  stream = ExAws.S3.download_file(bucket, key, :memory)
  |> ExAws.stream!()

  %{key: key, stream: stream}
end)

# We could have kept piping, but thought this was more readable maybe.

Stream.map(objects, &(Zstream.entry(&1.key, &1.stream)))
|> Zstream.zip()
|> Aw.Stream.chunk_by_bytes({5, :MiB})
|> ExAws.S3.upload(bucket, Path.join(prefix, "foobar.zip"))
|> ExAws.request!()

Not exactly the same, but our goal was “taking many files on S3, downloading them, and zipping them into a single file that we upload to S3.” This is all done with a single Elixir stream.

Despite the output being many gigabytes (maybe close to a terabyte), the process only uses a couple of hundred megabytes of memory.

I’m not sure what the Google Drive API is like, but you should be able to Stream.resource that shit! Once it’s in stream form, everything else is gravy.

3 Likes