Making a stream out of the http upload body content towards GenStage?


To handle the upload of really large files, the creation of the intermediate Plug.Upload file is too slow for me. In the Plug/Phoenix interaction, the Plug.Parser.MULTIPART first creates the intermediate file containing all the content uploaded. Once that is done, my route is invoked with the %Plug.Upload{} struct in the params. I now have to process that file, also uploading it further to a storage backend (no, not S3 so please don’t point me to S3 pre-signed URLs). This results in a total transaction time which is double of what I want.

I currently have my process + uploading in place with a GenStage setup:
producer: GenStage.from_enumerable(!(testfile))
consumer_producer with BroadcastDispatcher
consumer1: extracting metadata from the stream, ignoring all the rest
consumer2: uploading the stream to a backend storage system

This setup works so far, but I’m having a hard time getting the first part in place: processing the http request body (from Plug.Conn), exposing that as a stream or producer and connecting it with the consumer_producer of the GenStage setup.

My idea at the moment was to disable the standard Plug.Parsers.MULTIPART and implement my own multipart parser. When my MultipartStream.parse/5 is called, I would set up the stream/producer and return it in the params. A bit later in the call chain, one of my routes is the http upload controller function. I retrieve the stream/producer from the params, start a new consumer_producer + consumers process tree and hook them up to each other.

So far my thinking! But I am running in circles getting it implemented properly. According my thinking path, I have to create a Stream implementation for which I control the input (source) as the output (sink). But in my naive thinking, the source handling (putting binary data on the stream) should be done in the plug process and the sink handling is done in one of the GenStage processes. At this point I’m lost with the whole mailbox message handling.

  • Should the code to read from Plug.Conn and put it on a stream be done in the same Plug process handling the connection? Or is it safe to call Plug.Conn.read_part_body/2 from another process?
  • How do I implement a custom Stream or Consumer compatible with the plug request handling?
  • I need a way to wait on this whole setup to finish before I return from the Phoenix route function.

Am I making it too complex?

If someone reading this will be attending the upcoming ElixirConfEU and is willing to do some pair programming with me on this, this would be very much appreciated.


That’s generally the clue to stop trying to use a single http request to upload the file, but rather use chunked uploads (using for example tus or resumable). Nobody wants to reupload large files from scratch because the network was flaky for a short time or whatever.

Agree. But even if you split the upload in multiple http requests, I still would like to process it without intermediate Plug.Upload files. So the problem still stands, even with your suggestions.

With chunked uploads you’ll not have the whole file within a single request, so you need to store the data somewhere between requests. As you’re obviously handling large files you probably don’t want to do that in memory, so putting chunks onto the filesystem until they can be combined is probably the way to go (move the created file of Plug.Upload to a permanent place). Then if you have received the full file (aka all chunks) you can trigger you GenStage pipeline.

Also chunked uploads let you parallelize the upload of the file, which might give you more speed improvement than preventing the creation of a single file in favor of a single stream.

This blog post is a pretty good reference on Elixir streams, and the Enumerable/Collectable protocols.

You could use Stream.unfold in a controller to lazily read the body.

It might be simpler to write a Collectable implementation instead of a GenStage producer/consumer.

1 Like

I would also like to second this use case, as I want to stream the user’s file upload directly to another server. The reason being that my web server has very small disk (think micro instances) as it only acts as a load balancer to send files to worker servers.

Plug.Upload puts the file into temp makes it impossible to use with big file (in the range of 50 - 100GBs).

Would be nice if it can take an option for a stream to write to instead of generating the File object internally. The default can be that File if the option is not specified.

Generally I’d advice to send the data directly to the other server instead. Though if you really want you can implement a custom Plug.Parsers parser to do it instead of the default one (Plug.Parsers.MULTIPART).