Merging together a chunked file using streams

I’m trying to merge in file chunks into 1 file in an efficient and idiomatic way.

Here is my attempt so far

state.upload_chunk_files
|> Enum.sort(&sort_chunk_paths/2)
|> Enum.reduce(File.stream!("output.png"), fn chunk_path, file_stream ->
    Stream.into(file_stream, File.stream!(elem(chunk_path, 1)))
    end)
|> Stream.run()

state.upload_chunk_files is a keyword list where the key is the order and the value is the path to the chunk.

Essentially what I’m trying to do is loop over all the chunks stream each chunk and append the chunk stream into the “full file” stream. I get no errors in my code but output.png is always empty.

I also tried appending the chunk binary instead of the stream

...
Stream.into(file_stream, File.read!(elem(chunk_path, 1)))
...

But I get the same empty output file.

Stream.into’s first argument is where the data comes from, the second (collectable) is where the data goes to.

1 Like

Thanks! Building the chunked file using streams produces a corrupt end file. So originally I had implemented the code like this.

{:ok, full_file_path} = Briefly.create()

state.upload_chunk_files
|> Enum.sort(&sort_chunk_paths/2)
|> Enum.reduce(File.open!(full_file_path, [:append]), &build_file/2)

...

defp build_file(chunk_path, full_file) do
  IO.binwrite(full_file, File.read!(elem(chunk_path, 1)))
  full_file
end

upload_chunk_files is a keyword list that looks like this

["1": "./data/1234/1", "2": "./data/1234/2"]

This initial implementation works fine until I starting testing the memory usage. I realized even though I’m only writing to the file chunk by chunk by the time all the chunks are read the entire file is in memory. My initial thought, coming from mutable languages, was once the read chunk is written to the file it would be removed from memory.

So I started looking into streams and with your help I came up with this implementation

state.upload_chunk_files
|> Enum.sort(&sort_chunk_paths/2)
|> Enum.reduce(File.stream!(full_file_path, [:append], 200_000), &build_file/2)
 
...

defp build_file(chunk_path, full_file) do
  Stream.into(File.stream!(elem(chunk_path, 1)), full_file) |> Stream.run()
  full_file
end

The proper amount of bytes are transfer over to the “full file”; however when I try to open it it is corrupted. I tried different modes for opening the stream but could not get it to open up the file properly.

You can achieve what you described something like this i think

file_streams = 
  state.upload_chunk_files
  |> Enum.sort(&sort_chunk_paths/2)
  |> Enum.map(&File.stream!(&1, [], 200_000)))

Stream.concat(file_streams)
|> Stream.into(File.stream!(full_file_path))
|> Stream.run()
2 Likes

I think your approach should work as well but you are reading file with default line_or_bytes option which is :line, which normalizes the bytes. I have been bitten by this more than once.

To illustrate what I mean:

File.write!("test.txt", "test\r\nok")

File.stream!("test.txt", [], :line)
|> Enum.join("")
|> byte_size()
|> IO.inspect(label: "line")

File.stream!("test.txt", [], 10)
|> Enum.join("")
|> byte_size()
|> IO.inspect(label: "bytes")

Which outputs

line: 7
bytes: 8

From the docs:

The line_or_bytes argument configures how the file is read when streaming, by :line (default) or by a given number of bytes. When using the :line option, CRLF line breaks (" ") are normalized to LF (" ").

Wow thank you! Setting the chunked file stream to be read by bytes fixed the issue!