Help with `:file` blocking GenServer process

I have a GenServer that reads/writes to a Linux Pipe or Fifo. I need to do:

{:ok, fifo} = :file.open('/path/to/fifo', [:read, :write, :binary])
{:ok, <<protocol_pattern_match>>} = :file.read(fifo, protocol_size)

Except :file.read/2 blocks the entire calling process until it is complete similar to read() in C. This is fine because i should be able to just do task = Task.async(:file, :read, [protocol_size]) and get the result in
handle_info/2. Maybe i misunderstood the docs, but that doesn’t seem to be working for me. I’m expecting to get handle_info({ref, {:ok, <<protocol_pattern_match>>}, %{task: %{ref: ref}}) but that doesn’t seem to happen.

My other issue is that Task.shutdown/2 or :file.close/1 do not seem to be working as expected.
the docs for Task.shutdown/2 say when a calling process exits, the task should exit, but even using
Task.shutdown(state.task, :brutal_kill) doesn’t allow me to call :file.close(state.fifo) in terminate/2. (It just blocks forever)

anyway here’s the entire GenServer implementation:

defmodule PipeWorker do
  @moduledoc """
  Proxy for IO operations.
  """
  use GenServer
  require Logger

  def start_link(pipe_name) do
    GenServer.start_link(__MODULE__, pipe_name)
  end

  def close(pipe) do
    GenServer.stop(pipe, :normal)
  end

  def read(pipe, size) do
    GenServer.call(pipe, {:read, [size]}, :infinity)
  end

  def write(pipe, packet) do
    GenServer.call(pipe, {:write, [packet]}, :infinity)
  end

  def init(pipe_name) do
    with {_, 0} <- System.cmd("mkfifo", [pipe_name]),
         {:ok, pipe} <- :file.open(to_charlist(pipe_name), [:read, :write, :binary]) do
      {:ok, %{pipe_name: pipe_name, pipe: pipe, task: nil, caller: nil}}
    else
      {:error, _} = error -> {:stop, error}
      {_, _num} -> {:stop, {:error, "mkfifo"}}
    end
  end

  def terminate(_, state) do
    Logger.warn("PipeWorker #{state.pipe_name} exit")
    state.task && Task.shutdown(state.task, :brutal_kill)
    Logger.warn("Pipe Task shut down")
    IO.inspect(state.pipe, label: "pipe")
    # :file.close(state.pipe) # blocks indefinitely no matter what. Shell becomes unresponsive.
    # Logger.warn("Pipe closed")
    File.rm!(state.pipe_name)
    Logger.warn("Pipe removed")
  end

  def handle_call({cmd, args}, {pid, _} = _from, state) do
    IO.inspect([state.pipe | args], label: "Pipe task args")
    task = Task.async(:file, cmd, [state.pipe | args])
    IO.inspect(task, label: "Pipe task")
    {:reply, task.ref, %{state | task: task, caller: pid}}
  end

  # This is never called?
  def handle_info({ref, result}, %{task: %{ref: ref}, caller: pid} = state) do
    IO.inspect({ref, result}, label: "Task result")
    send(pid, {__MODULE__, ref, result})
    {:noreply, %{state | task: nil, caller: nil}}
  end
end
2 Likes

Okay so while investigating this, i discovered something.

This code will hang forever no matter what. It doesn’t matter how much data
is written to the pipe, the data will never be read by Erlang/Elixir.

{:ok, file} = :file.open(path_to_fifo, [:read, :write, :binary])
{:ok, "hello world"} = :file.read(file, 11)
# in a different tab to `File.write(path_to_fifo, "hello world")`

This code however works fine. I suspect this is because the :raw option is a NIF now. (just a guess?)

# notice the `:raw` option.
{:ok, file} = :file.open(path_to_fifo, [:read, :write, :binary, :raw]) 
{:ok, "hello world"} = :file.read(file, 11)
# in a different tab to `File.write(path_to_fifo, "hello world")`

The only problem with opening in :raw mode is that i can’t use Task to call read/2 because the Task is not an owning process. I get the following error:

** (ErlangError) Erlang error: :not_on_controlling_process
        :prim_file.get_fd_data/1
        :prim_file.read/2
        (elixir) lib/task/supervised.ex:89: Task.Supervised.do_apply/2
        (elixir) lib/task/supervised.ex:38: Task.Supervised.reply/5
        (stdlib) proc_lib.erl:249: :proc_lib.init_p_do_apply/3

I guess this isn’t really an Elixir issue, but an Erlang one but any help would be appreciated.

1 Like

Doesn’t that need to be used with Task.await to receive the result? I also think tasks use receive blocks that wouldn’t work in a genserver process.

From the Task docs:

1 Like

Oh, sorry then.

if i manually change my code to do a :file.read(fd, 0) the behaviour works. I recieve the result in handle_info, so i at least am fairly certain Task is not the problem here.

The solution is to use ports, which will also give asynchrony. See previous discussion here: Elixir vs Unix named pipe

I think it was recently announced that erlang supports pipes, but I cant recall if in port or file. Or I may be completely misremebering it. Anyway, a port should do nowadays.

4 Likes

Sorry, i should have stated this in the original post, but i have tried this as well and it did not work either.

Interactive Elixir (1.7.3) - press Ctrl+C to exit (type h() ENTER for help)
iex(1)> System.cmd("mkfifo", ["/tmp/fifo", "-m", "0666"])          
{"", 0}
iex(2)> port = :erlang.open_port({:spawn, '/tmp/fifo'}, [:binary]) 
#Port<0.7>
iex(3)> sh: /tmp/fifo: Permission denied
                                        sh: line 0: exec: /tmp/fifo: cannot execute: Permission denied

and in another terminal, writing to the FIFO just hangs until i send a ctrl+c

echo "hello world" -e > "/tmp/fifo"                       Thu 24 Jan 2019 10:28:34 AM PST
^CAn error occurred while redirecting file '/tmp/fifo'
open: Interrupted system call
1 Like

So this worked:

~/OSS/elixir[jv-basic-releases %]$ mkfifo test.pipe
~/OSS/elixir[jv-basic-releases %]$ erl
Erlang/OTP 21 [erts-10.0] [source] [64-bit] [smp:8:8] [ds:8:8:10] [async-threads:1] [hipe]

Eshell V10.0  (abort with ^G)
1> Fifo = open_port("test.pipe", [eof]),
1> receive
1> Msg -> io:format("Got ~p", [Msg])
1> end.
Got {#Port<0.5>,{data,"foo\n"}}ok

After typing the receive, I ran echo "foo" > test.pipe in another terminal. Could it be something related to permissions as the error message says?

4 Likes

ah ha! i was trying to use spawn which was not needed. Will try this method out and report back.

I can confirm that this does in fact allow reading and writing from a named pipe. However it does add the requirement of buffering now, since you are not telling the port how much data you want to read. Either way thanks for the help!

1 Like

Good point. Can that be done at the pipe level? I.e. have some UNIX utility that buffers it for you or similar?

I’m still investigating solutions. I whipped up a quick NIF that works similarly to :file.open('test.pipe', [:read, :write, :raw] but allows for calling read/2 within a Task. Unfortunately this is the best solution so far, however i don’t like putting NIFs into production unless there is no other options.

For completeness here is my (mostly) final implementation:

defmodule PipeWorker do
  @moduledoc """
  Proxy for Pipe IO operations.
  """
  use GenServer
  require Logger

  def start_link(pipe_name) do
    GenServer.start_link(__MODULE__, [pipe_name])
  end

  def close(pipe) do
    GenServer.stop(pipe, :normal)
  end

  def read(pipe, amnt) do
    GenServer.call(pipe, {:read, amnt})
  end

  def write(pipe, packet) do
    GenServer.call(pipe, {:write, packet}, :infinity)
  end

  def init([pipe_name]) do
    with {_, 0} <- System.cmd("mkfifo", [pipe_name]),
         pipe <- :erlang.open_port(to_charlist(pipe_name), [:eof, :binary]) do
      {:ok, %{pipe_name: pipe_name, pipe: pipe, buffer: <<>>, caller: nil, size: nil}}
    else
      {:error, _} = error -> {:stop, error}
      {_, _num} -> {:stop, {:error, "mkfifo"}}
    end
  end

  def terminate(_, state) do
    Logger.warn("PipeWorker #{state.pipe_name} exit")
    :erlang.port_close(state.pipe)
    File.rm!(state.pipe_name)
  end

  def handle_call({:write, packet}, _from, state) do
    reply = :erlang.port_command(state.pipe, packet)
    {:reply, reply, state}
  end

  def handle_call({:read, amnt}, {_pid, ref} = from, %{caller: nil, size: nil} = state) do
    {:reply, ref, %{state | caller: from, size: amnt, buffer: nil}, {:continue, state.buffer}}
  end

  def handle_info({pipe, {:data, data}}, %{pipe: pipe, buffer: buffer} = state) do
    buffer = buffer <> data
    {:noreply, %{state | buffer: nil}, {:continue, buffer}}
  end

  def handle_continue(buffer, %{size: size} = state) when byte_size(buffer) >= size do
    {pid, ref} = state.caller
    {resp, buffer} = String.split_at(buffer, size)
    send(pid, {__MODULE__, ref, {:ok, resp}})
    {:noreply, %{state | caller: nil, size: nil, buffer: buffer}}
  end

  def handle_continue(buffer, state) do
    {:noreply, %{state | buffer: buffer}}
  end
end

and usage is something like:

# Start the Pipe
{:ok, pid} = PipeWorker.start_link("/tmp/test.pipe")
# Start an asynchronous read of 10 bytes
ref = PipeWorker.read(pid, 10)
# This can obviously be done in `handle_info` of a GenServer or other OTP behaviour.
receive do
   {PipeWorker, ^ref, {:ok, <<protocol_pattern_match>> = data} -> handle_data(data) 
end

For EVEN MORE completeness here’s the NIF implementation i won’t be using: https://github.com/ConnorRigby/pipe_fitter/blob/master/c_src/erl_pipe_nif.c
where usage is something like:

# Start the pipe
{:ok, pipe} = :erl_pipe_nif.start('/tmp/test.pipe')
# Tell the NIF we want a message as soon as data is ready. 
# This is powered by `enif_select`
:ok = :erl_pipe_nif.poll(pipe)
# This can obviously be done in `handle_info` of a GenServer or other OTP behaviour.
receive do
   {:select, ^pipe, _, _} -> 
      # Read the data. Still blocking here, but as long as it is only called after the `:select`
      # message, NBD. 
      :erl_pipe_nif.read(pipe, 10)
end

Anyway Thanks again @josevalim for making me check out the Port option again. Hope the information here is enough to answer anyone elses questions in the future.

1 Like

I have a stupid question. You are reading with:

{:ok, <<protocol_pattern_match>>} = :file.read(fifo, protocol_size)

Do you write enough bytes into the fifo so this read can complete? I expect yes but it is always best to ask. Start with the simple questions first. :smile:

2 Likes

Yes, but i understand what you are saying. I understand that file:read(Fifo, Size) will block until Size amount of bytes are available. This is fine, but in my case i didn’t want to block the gen_server process while waiting on those bytes to arrive. In my case they may never arrive for the life of this fifo, meaning i need to be able to cancel, or stop the existing read operation.

Well the file:read operation blocks by design. Two solutions have already been mentioned: put the read in a separate process which sends you a message when done and which you can kill when you give up on it; use ports which are non-blocking.

3 Likes

Right. I chose the port method and it seems to be working well!

2 Likes