Hello,
To handle the upload of really large files, the creation of the intermediate Plug.Upload
file is too slow for me. In the Plug/Phoenix interaction, the Plug.Parser.MULTIPART
first creates the intermediate file containing all the content uploaded. Once that is done, my route is invoked with the %Plug.Upload{}
struct in the params. I now have to process that file, also uploading it further to a storage backend (no, not S3 so please don’t point me to S3 pre-signed URLs). This results in a total transaction time which is double of what I want.
I currently have my process + uploading in place with a GenStage setup:
producer: GenStage.from_enumerable(File.stream!(testfile))
consumer_producer with BroadcastDispatcher
consumer1: extracting metadata from the stream, ignoring all the rest
consumer2: uploading the stream to a backend storage system
This setup works so far, but I’m having a hard time getting the first part in place: processing the http request body (from Plug.Conn
), exposing that as a stream or producer and connecting it with the consumer_producer of the GenStage setup.
My idea at the moment was to disable the standard Plug.Parsers.MULTIPART
and implement my own multipart parser. When my MultipartStream.parse/5
is called, I would set up the stream/producer and return it in the params
. A bit later in the call chain, one of my routes is the http upload controller function. I retrieve the stream/producer from the params, start a new consumer_producer + consumers process tree and hook them up to each other.
So far my thinking! But I am running in circles getting it implemented properly. According my thinking path, I have to create a Stream implementation for which I control the input (source) as the output (sink). But in my naive thinking, the source handling (putting binary data on the stream) should be done in the plug process and the sink handling is done in one of the GenStage processes. At this point I’m lost with the whole mailbox message handling.
- Should the code to read from
Plug.Conn
and put it on a stream be done in the same Plug process handling the connection? Or is it safe to callPlug.Conn.read_part_body/2
from another process? - How do I implement a custom Stream or Consumer compatible with the plug request handling?
- I need a way to wait on this whole setup to finish before I return from the Phoenix route function.
Am I making it too complex?
If someone reading this will be attending the upcoming ElixirConfEU and is willing to do some pair programming with me on this, this would be very much appreciated.
Ringo