I have a GenServer that reads/writes to a Linux Pipe or Fifo. I need to do:
{:ok, fifo} = :file.open('/path/to/fifo', [:read, :write, :binary])
{:ok, <<protocol_pattern_match>>} = :file.read(fifo, protocol_size)
Except :file.read/2
blocks the entire calling process until it is complete similar to read()
in C. This is fine because i should be able to just do task = Task.async(:file, :read, [protocol_size])
and get the result in
handle_info/2
. Maybe i misunderstood the docs, but that doesn’t seem to be working for me. I’m expecting to get handle_info({ref, {:ok, <<protocol_pattern_match>>}, %{task: %{ref: ref}})
but that doesn’t seem to happen.
My other issue is that Task.shutdown/2
or :file.close/1
do not seem to be working as expected.
the docs for Task.shutdown/2
say when a calling process exits, the task should exit, but even using
Task.shutdown(state.task, :brutal_kill)
doesn’t allow me to call :file.close(state.fifo)
in terminate/2
. (It just blocks forever)
anyway here’s the entire GenServer implementation:
defmodule PipeWorker do
@moduledoc """
Proxy for IO operations.
"""
use GenServer
require Logger
def start_link(pipe_name) do
GenServer.start_link(__MODULE__, pipe_name)
end
def close(pipe) do
GenServer.stop(pipe, :normal)
end
def read(pipe, size) do
GenServer.call(pipe, {:read, [size]}, :infinity)
end
def write(pipe, packet) do
GenServer.call(pipe, {:write, [packet]}, :infinity)
end
def init(pipe_name) do
with {_, 0} <- System.cmd("mkfifo", [pipe_name]),
{:ok, pipe} <- :file.open(to_charlist(pipe_name), [:read, :write, :binary]) do
{:ok, %{pipe_name: pipe_name, pipe: pipe, task: nil, caller: nil}}
else
{:error, _} = error -> {:stop, error}
{_, _num} -> {:stop, {:error, "mkfifo"}}
end
end
def terminate(_, state) do
Logger.warn("PipeWorker #{state.pipe_name} exit")
state.task && Task.shutdown(state.task, :brutal_kill)
Logger.warn("Pipe Task shut down")
IO.inspect(state.pipe, label: "pipe")
# :file.close(state.pipe) # blocks indefinitely no matter what. Shell becomes unresponsive.
# Logger.warn("Pipe closed")
File.rm!(state.pipe_name)
Logger.warn("Pipe removed")
end
def handle_call({cmd, args}, {pid, _} = _from, state) do
IO.inspect([state.pipe | args], label: "Pipe task args")
task = Task.async(:file, cmd, [state.pipe | args])
IO.inspect(task, label: "Pipe task")
{:reply, task.ref, %{state | task: task, caller: pid}}
end
# This is never called?
def handle_info({ref, result}, %{task: %{ref: ref}, caller: pid} = state) do
IO.inspect({ref, result}, label: "Task result")
send(pid, {__MODULE__, ref, result})
{:noreply, %{state | task: nil, caller: nil}}
end
end