Streaming tar files

Hello,

I am trying to download a tarball, extract and decompress a file from it and write it somewhere on the disk in one go. Basically wget -qO- http://my_server/archive.tar | tar -xf - data/some.tar.gz -O | tar -xzf -.

Since erl_tar does not look like it supports streams I decided using a port with tar to do the extraction but I am failing doing so.

Here is the source of my GenServer that opens the port and is supposed to extract the inner gzip compressed file:

defmodule UnTar do
  use GenServer
  require Logger

  defstruct port: nil, collector_fun: nil, collector_acc: nil

  def start_link(opts \\ []) do
    {server_opts, otp_opts} = Keyword.split(opts, [:into])
    GenServer.start_link(__MODULE__, server_opts, opts)
  end

  @impl true
  def init(opts) do
    into = Keyword.fetch!(opts, :into)

    tar = tar_exe()

    tar_args = ["-xf", "-", "data/some.tar.gz", "-O"]

    port_args = [
      {:args, tar_args},
      :use_stdio,
      :binary,
      :exit_status
    ]

    port = Port.open({:spawn_executable, tar}, port_args)
    {collector_acc, collector_fun} = Collectable.into(into)

    {:ok,
     %__MODULE__{
       port: port,
       collector_fun: collector_fun,
       collector_acc: collector_acc
     }}
  end

  def send_chunk(pid, chunk) do
    GenServer.call(pid, {:send_chunk, chunk})
  end

  @impl true
  def handle_call({:send_chunk, chunk}, _from, state) do
    Port.command(state.port, chunk)
    {:reply, :ok, state}
  end

  @impl true
  def handle_info(
        {_port, {:data, data}},
        %{collector_fun: fun, collector_acc: acc} = state
      ) do
    Logger.info("receiving chunk form port")
    new_acc = apply(fun, [acc, {:cont, data}])
    {:noreply, %{state | collector_acc: new_acc}}
  end

  def handle_info({_port, {:exit_status, 0}}, state) do
    Logger.info("exiting normal")
    {:stop, :normal, %{state | port: nil}}
  end

  def handle_info({_port, {:exit_status, status}}, state) do
    Logger.info("exiting with #{status}")
    {:stop, {:exit, status}, %{state | port: nil}}
  end

  defp tar_exe do
    System.find_executable("tar") || raise("Could not find `tar` executable.")
  end
end

This is how I use the server:

source_stream = File.stream!("path/to/source_archive.tar", [:read, :binary], 512)
target_stream = File.stream!("path/to/target", [:write, :binray])

{:ok, pid} = UnTar.start_link(into: target_stream)

source
|> Stream.map(fn chunk ->
  UnTar.send_chunk(pid, chunk)
end)
|> Stream.run()

At the end tar prints prints out:

/usr/bin/tar: data/some.tar.gz: Cannot write: Broken pipe                                                                                                                                                                          
/usr/bin/tar: Exiting with failure status due to previous errors  

The resulting file is corrupted and tar does not send any exit code to my server.
Any idea what I am doing wrong?

Edit:
Forgot to add the bytes_or_line arg to the source stream. I wonder if this might have something to do with tar not being able to terminate the end of the file. tar uses a block size of 512 bytes and if I don’t provide the block size tar is additionally printing /usr/bin/tar: A lone zero block at 51035

1 Like