Hello,
I am trying to download a tarball, extract and decompress a file from it and write it somewhere on the disk in one go. Basically wget -qO- http://my_server/archive.tar | tar -xf - data/some.tar.gz -O | tar -xzf -
.
Since erl_tar
does not look like it supports streams I decided using a port with tar
to do the extraction but I am failing doing so.
Here is the source of my GenServer
that opens the port and is supposed to extract the inner gzip compressed file:
defmodule UnTar do
use GenServer
require Logger
defstruct port: nil, collector_fun: nil, collector_acc: nil
def start_link(opts \\ []) do
{server_opts, otp_opts} = Keyword.split(opts, [:into])
GenServer.start_link(__MODULE__, server_opts, opts)
end
@impl true
def init(opts) do
into = Keyword.fetch!(opts, :into)
tar = tar_exe()
tar_args = ["-xf", "-", "data/some.tar.gz", "-O"]
port_args = [
{:args, tar_args},
:use_stdio,
:binary,
:exit_status
]
port = Port.open({:spawn_executable, tar}, port_args)
{collector_acc, collector_fun} = Collectable.into(into)
{:ok,
%__MODULE__{
port: port,
collector_fun: collector_fun,
collector_acc: collector_acc
}}
end
def send_chunk(pid, chunk) do
GenServer.call(pid, {:send_chunk, chunk})
end
@impl true
def handle_call({:send_chunk, chunk}, _from, state) do
Port.command(state.port, chunk)
{:reply, :ok, state}
end
@impl true
def handle_info(
{_port, {:data, data}},
%{collector_fun: fun, collector_acc: acc} = state
) do
Logger.info("receiving chunk form port")
new_acc = apply(fun, [acc, {:cont, data}])
{:noreply, %{state | collector_acc: new_acc}}
end
def handle_info({_port, {:exit_status, 0}}, state) do
Logger.info("exiting normal")
{:stop, :normal, %{state | port: nil}}
end
def handle_info({_port, {:exit_status, status}}, state) do
Logger.info("exiting with #{status}")
{:stop, {:exit, status}, %{state | port: nil}}
end
defp tar_exe do
System.find_executable("tar") || raise("Could not find `tar` executable.")
end
end
This is how I use the server:
source_stream = File.stream!("path/to/source_archive.tar", [:read, :binary], 512)
target_stream = File.stream!("path/to/target", [:write, :binray])
{:ok, pid} = UnTar.start_link(into: target_stream)
source
|> Stream.map(fn chunk ->
UnTar.send_chunk(pid, chunk)
end)
|> Stream.run()
At the end tar
prints prints out:
/usr/bin/tar: data/some.tar.gz: Cannot write: Broken pipe
/usr/bin/tar: Exiting with failure status due to previous errors
The resulting file is corrupted and tar
does not send any exit code to my server.
Any idea what I am doing wrong?
Edit:
Forgot to add the bytes_or_line
arg to the source stream. I wonder if this might have something to do with tar
not being able to terminate the end of the file. tar
uses a block size of 512 bytes and if I don’t provide the block size tar
is additionally printing /usr/bin/tar: A lone zero block at 51035