ExCmd - communicate with external programs with back pressure

Hi all,

I’m tinkering around the idea of streaming data through an external program (think streaming video through ffmpeg command and receiving the output back) from the last few weeks. Mainly focused on communicating with long-running programs with back-pressure. After exploring many approaches I settled on this. ExCmd uses named FIFO to solve back-pressure and other issues. It also uses odu (which is based on goon) to fill gaps in the erlang ports.

Currently, it’s at an early stage. I’m still thinking about the interface it should provide to expose all its functionality for different use cases effectively.

Please check it out and share your feedback :slight_smile:

Background

Why not use built-in ports?

  • Unlike beam ports, ExCmd puts back pressure on the external program
  • Proper program termination. No more zombie process
  • Ability to close stdin and wait for output (with ports one can not selectively close stdin)

While exploring the options, I also played around another approach, which does not use named FIFO. Its more like GenStage, the receiver beam process “demands” external program for output using stdin and stdout., but it has its own set of other issues.

14 Likes

Added ability to stream input and output. Now one can do something like this

def audio_stream!(stream) do
  # read from stdin and write to stdout
  proc_stream = ExCmd.stream!("ffmpeg", ~w(-i - -f mp3 -))

  Task.async(fn ->
    Stream.into(stream, proc_stream)
    |> Stream.run()
  end)

  proc_stream
end

File.stream!("music_video.mkv", [], 65535)
|> audio_stream!()
|> Stream.into(File.stream!("music.mp3"))
|> Stream.run()

Along with this there are many changes related to interface and error handling. Please check documentation for more details

v0.1.0

Github

7 Likes

I have installed both ex_cmd and odu (and have put it in my PATH). Then:

ExCmd.stream!("find", ["/Users/dimi/Downloads/temp", "-name", "*.html"]) |> Enum.to_list()

Blocks for 5 seconds and gives me this:

** (exit) exited in: GenServer.call(#PID<0.382.0>, {:open_fifo, :output, :read}, 5000)
    ** (EXIT) time out
    (elixir 1.10.2) lib/gen_server.ex:1023: GenServer.call/3
    (ex_cmd 0.1.0) lib/ex_cmd/stream.ex:43: anonymous fn/1 in Enumerable.ExCmd.Stream.reduce/3
    (elixir 1.10.2) lib/stream.ex:1407: anonymous fn/5 in Stream.resource/3
    (elixir 1.10.2) lib/enum.ex:3383: Enum.reverse/1
    (elixir 1.10.2) lib/enum.ex:2982: Enum.to_list/1

Am I doing something wrong? The directory has 10 files in total, 2 of which are HTML.

1 Like

Hi @dimitarvp, thanks for taking your time to check it.

In this case we are trying to read output FIFO (that is the output of the find command), without writing any input. This fails because the external wrapper command odu is expecting a process to open input FIFO in write mode.
This is so because we can not know if the external command needs input or not. odu assume every command needs an input stream and blocks till streams are connected.

So to fix we just have to open a dummy writer

proc_stream = ExCmd.stream!("find", ["/Users/dimi/Downloads/temp", "-name", "*.html"])
Task.async(fn -> Enum.into([], proc_stream) end)
proc_stream |> Enum.to_list()

I think, we should have better error message and maybe we should have an option for explicitly saying this command does not use input.

1 Like

Thanks for clarifying. This code is however not intuitive (and why do we need to spawn a Task for it to work?). Would you consider adding options to ExCmd.stream! then? For example, something like ExCmd.stream!(..., :stdout_only) would invisibly execute the code you pasted above.

I agree I’ll add an option to disable stdin


why do we need to spawn a Task for it to work?

tl;dr if we do not spawn separate process it will cause deadlock.

If anyone interested in this topic,
Stream is hiding synchronization happening between beam processes and external programs under the hood.

Without Task it would look something similar to this

proc_stream = ExCmd.stream!("find", ["/Users/dimi/Downloads/temp", "-name", "*.html"])
Enum.into([], proc_stream)
proc_stream |> Enum.to_list()

this is roughly equivalent to following steps with syscalls

1. create stream struct
2. syscall: fd = open("input.pipe", O_WRONLY)
3. syscall: close(fd)
4. run external program (at some point odu will call exec("cmd"))
5. syscall: fd = open("output.pipe", O_RDONLY)
...

step-2 is blocking call, this will return only after “input.pipe” is opened by the reader, which is the external program. But we start the external program at step-4, hence the deadlock.

This behavior is more visible if one uses low-level API instead of using stream abstraction.

open does have a non-blocking flag O_NONBLOCK. but,

  1. beam does not support passing this flag
  2. behavior is undefined for open with writer mode under POSIX

Interestingly, before OTP-21 allowed opening FIFO. A popular solution to open a FIFO in erlang/elixir was to use :erlang.open_port. :erlang.open_port is blocking call too, but in this case, it blocks the whole vm!

{_, 0} = System.cmd("mkfifo", ["test.pipe"])
spawn(fn -> Port.open('test.pipe', [:eof, :out]) end)
:timer.sleep(500) # force scheduler to execute fifo open
IO.puts "This line is never printed!"
1 Like

Thanks a lot for the explanation! Makes sense.

Options like :stdin_only, :stdout_only and :stdin_and_stdout (the default) would help a lot. Boilerplate should be hidden away behind options and/or convenience functions like stream_stdin!, stream_stdout! and stream! (which expects both as it is right now).

Not sure about the names, they might not be good.

As for odu itself, it introduces a needlessly complex external dependency installation that some programmers might not be willing to subject themselves to. I’d suggest you write an in-app small Rust library. I can help you integrate Elixir with Rust – the release candidate of Rustler 0.22 has a much nicer and shorter syntax compared to previous versions and is now a joy to use.

3 Likes

Another thing: would there be a way to stream the spawned command’s output line by line? Currently I have to store a rather huge string in memory and then call String.split(the_whole_command_output, "\n"). Defeats the whole purpose of using Stream really. :frowning:

I am also interested in your opinion on:

3 Likes

Options like :stdin_only , :stdout_only and :stdin_and_stdout (the default) would help a lot. Boilerplate should be hidden away behind options and/or convenience functions like stream_stdin! , stream_stdout! and stream! (which expects both as it is right now).

Yes. I’m more inclined towards just adding additional no_stdin, no_stdout options and keep the same stream! interface. Just to avoid adding more functions, which can be confusing. But we can have separate functions if that makes more sense.

As for odu itself, it introduces a needlessly complex external dependency installation that some programmers might not be willing to subject themselves to. I’d suggest you write an in-app small Rust library. I can help you integrate Elixir with Rust – the release candidate of Rustler 0.22 has a much nicer and shorter syntax compared to previous versions and is now a joy to use.

Agree, I want to ditch the odu and have everything in a single library. When I started odu, I was mostly experimenting, and keeping it separate seemed simpler. Rustler looks interesting, I’ll look into it as soon as I get some time :slight_smile:.

Another thing, currently all this ceremony is because beam does not expose file descriptors for stdin and stdout. Definitely there will be some valid reason for that. But if we some how get a NIF/driver interface which let us access stdin/stdout fd, then we can just use :file.open with that fd and we can get rid of whole fifo thingy. This is all hand-waving, there might some issue in actual implementation.

Another thing: would there be a way to stream the spawned command’s output line by line? Currently I have to store a rather huge string in memory and then call String.split(the_whole_command_output, "\n") . Defeats the whole purpose of using Stream really.

We can split as soon as we get the output right? something like

    proc_stream
    |> Stream.transform("", fn data, acc ->
      lines = IO.iodata_to_binary([acc | data]) |> String.split("\n")
      Enum.split(lines, length(lines) - 1)
    end)

˚The size of data chunk we get (data in abve example) depends on the command we are running and when that command flushs its output and when we are issuing read. This size is limited by fifo buffer size which is controlled by OS. Usually this size will be max 65kb (sometimes its less in mac os). So unless user is explicitly collecting output for something there should not be memory leak.

I prefer to avoid adding spliting lines to ex_cmd itself. But if enough people want this, we can add

1 Like

Let me see if I am doing this right:

  @doc ~S"""
  Returns a `Stream` that yields full file paths corresponding to each HTML file
  inside the specified directory.
  """
  def stream_html_files_excmd(path) when is_binary(path) do
    expanded_path = Path.expand(path)
    stream = ExCmd.stream!("find", [expanded_path, "-name", "*.html"])

    # The `ex_cmd` library expects something to be written to the stdin of the invoked command.
    # So we spawn a separate dummy stdin writer `Task` to satisfy `ex_cmd`.
    Task.async(fn -> Enum.into([], stream) end)

    # Collect the stdout from the spawned command.
    stream
    |> Stream.transform("", fn data, acc ->
      lines = to_string([acc | data]) |> String.split("\n")
      Enum.split(lines, length(lines) - 1)
    end)
  end

Then it can just be used like stream_html_files_excmd("~/data/scraped.website") |> Enum.to_list().

This gives me the list of files that I need (double-checked with previously stored runs of find itself) and is working tens of times faster than Path.wildcard.

What sets your library apart is the Enumerable and Collectable integrations. Took me a bit to brush my Stream knowledge and with your help all is clear now.

Thank you for helping.

1 Like

That actually sounds much better than what I came up with.

Nope, no need IMO. You are right. Options are enough. Just make sure to document them well and all is good.

Have in mind that documentation is pretty sparse for 0.22-rc, and that all Rustler guides out there use the older, much more verbose syntax. To get a good understanding on how to use the new and terser format, @scrogson (one of Rustler’s maintainers) recommended me to inspect his own projects for inspiration. His franz GitHub repo got the job done for me and you can see the result in the sqlite3 DB adapter library that I am currently writing here: xqlite/native/xqlitenif/src/lib.rs at master · dimitarvp/xqlite · GitHub

1 Like

The main issue which ExCmd tries to solve, which none of these libraries solve is having proper stream with back-pressure. In these libraries progress of the external command is uncontrolled.

Internally they all use the port and some sort middleware program for IO. We can never have back-pressure (or limit) using ports as the process mailbox is unbounded. ExCmd also uses port and a middleware program (odu) but it only uses port for controlling the external program not for IO. For IO it uses named pipe (FIFO) which is demand-driven.

One can just write the output to a file and read that from to “solve” this. But,

  • This involves disk IO for writing and reading which add latency
  • We can not control external program speed
  • we have to cleanup these files properly which might not be trivial
  • and IMHO ergonomics is better with ExCmd approach as the stream is more composable

Apart from these issues,

  1. erlexec: It is more focused on orchestrating and linking the external command than on communicating with it. If erlexec accepts os pid for its functionality maybe we can use erlexec with ExCmd
  2. porcelain: There are few important issues like zombie process and not having the ability to forcefully kill
  3. rambo: similar to porcelain, but rambo does not allow streaming input to stdin so all input must be kept in memory or it has to be passed to the command by writing to a file. It collects output binary in memory by appending it which is not efficient

I think when these libraries were created, they were not trying to solve the issues I mentioned, so I think its not correct to compare them. Please let me if anything is incorrect.

4 Likes

It collects output binary in memory by appending it which is not efficient

Fixed in Rambo 0.3.2. :slightly_smiling_face:

2 Likes

Replaced by what behaviour?

Output is collected in IO lists now. I’m considering a Stream API too, but without backpressure since ports are limited as mentioned.

Anyway, don’t wish to hijack this thread. Continue Rambo discussions here.

1 Like

v0.3.0

This is a major release with completely different approach to solve the back-pressure. Ditched named pipes in favor of slightly complicated protocol. And with that ExCmd no longer has scheduler issuers.

I did consider this approach before, but dropped after I hit a blocker. Thanks to @ananthakumaran for clearing it up :+1:

Changes

  • fixed beam scheduler issues
  • demand-driven protocol for back-pressure instead of named pipes
  • few breaking changes to options
  • many internal changes for maintainability. Such as, ExCmd.Process now uses GenSteteMachine instead of GenServer
6 Likes

Nice update @akash-akya. I’ve decided on a protocol approach for my next update because I need a cross-platform solution, but you got there first. :slightly_smiling_face:

1 Like

Nice side effect removing fifo dependency is windows support. Finally, got time to test and confirm after a minor fix.

v0.4.1