Pass Port's stdout to HTTP connection as chunked data

Hello,

I’m writing my first Elixir program and I’ve managed to to the hardest part of it, but I’m stuck at the ultimate stage and I need some external help now.

The program is an HTTP server that receives requests from the user, analyze the parameters and launches an external executable. This executable spits binary data on it’s standard output and I need to pass this data to the client as a stream.

I’ve managed to write this part like this, but I can’t get any data at the client side. Can you help me understand what’s wrong and how I can fix it ?

# This function opens a Port, start a chunked HTTP response for the client and transmits the data
defp make_response({:ok, datareq}, conn) do
      dataselect = System.find_executable("dataselect")
      port = Port.open({:spawn_executable, dataselect},
        [:stderr_to_stdout, :binary, :exit_status, args: Enum.concat(["-s", selections_file, "-o", "-"], fileslist)])
      Logger.notice("Sending response from dataselect #{inspect(port)}")
      conn = conn
      |> Plug.Conn.put_resp_header("content-disposition", "attachment; filename=fdsnws-dataselect.mseed")
      |> Plug.Conn.put_resp_header("content-type", "application/vnd.fdsn.mseed")
      |> Plug.Conn.send_chunked(200)  # Initiate chunked response
      |> stream_dataselect_output(port)
end
# This function listen to the data from the port and sends it through Plug
  defp stream_dataselect_output(conn, port) do
    receive do
      {:data, data} ->
        Logger.debug("Receiving data chunk for #{inspect(data)}")
        conn
        |> Plug.Conn.chunk(data)
        |> stream_dataselect_output(port)
      :eof ->
        conn
      {:exit_status, status } ->
        Logger.info("Exit status: #{status}")
        conn
    after 3000 ->
        Logger.info("Timeout waiting for dataselect")
        conn
    end
  end

I always hit the timeout and get the error message here:

08:52:05.774 [notice] Sending response from dataselect #Port<0.18>

08:52:05.777 [info] Chunked 200 in 308ms

08:52:08.778 [info] Timeout waiting for dataselect

08:52:08.822 [error] GenServer #PID<0.677.0> terminating
** (FunctionClauseError) no function clause matching in Bandit.HTTP1.Handler.handle_info/2
    (bandit 1.0.0-pre.10) lib/thousand_island/handler.ex:5: Bandit.HTTP1.Handler.handle_info({#Port<0.18>, {:data, <<48, 48, 48, 48, 48, 53, 77, 32, 67, 73, 69, 76, 32, 48, 49, 72, 78, 90, 70, 82, 7, 231, 0, 1, 0, 0, 0, 0, 0, 0, 1, 104, 0, 200, 0, 1, 2, 32, 0, 2, 0, 0, 0, 0, 0, 64, ...>>}}, {%ThousandIsland.Socket{socket: #Port<0.17>, transport_module: ThousandIsland.Transports.TCP, read_timeout: 60000, span: %ThousandIsland.Telemetry{span_name: :connection, telemetry_span_context: #Reference<0.2562483382.3605266436.113476>, start_time: -576460748085329422, start_metadata: %{parent_telemetry_span_context: #Reference<0.2562483382.3605266433.114612>, remote_address: {127, 0, 0, 1}, remote_port: 34208, telemetry_span_context: #Reference<0.2562483382.3605266436.113476>}}}, %{handler_module: Bandit.HTTP1.Handler, http_1_enabled: true, http_2_enabled: true, opts: %{http_1: [], http_2: [], websocket: []}, plug: {Wsdataselect.Router, []}, requests_processed: 1, websocket_enabled: true}})
    (stdlib 4.3) gen_server.erl:1123: :gen_server.try_dispatch/4
    (stdlib 4.3) gen_server.erl:1200: :gen_server.handle_msg/6
    (stdlib 4.3) proc_lib.erl:240: :proc_lib.init_p_do_apply/3
Last message: {#Port<0.18>, {:data, <<48, 48, 48, 48, 48, 53, 77, 32, 67, 73, 69, 76, 32, 48, 49, 72, 78, 90, 70, 82, 7, 231, 0, 1, 0, 0, 0, 0, 0, 0, 1, 104, 0, 200, 0, 1, 2, 32, 0, 2, 0, 0, 0, 0, 0, 64, ...>>}}
State: {%ThousandIsland.Socket{socket: #Port<0.17>, transport_module: ThousandIsland.Transports.TCP, read_timeout: 60000, span: %ThousandIsland.Telemetry{span_name: :connection, telemetry_span_context: #Reference<0.2562483382.3605266436.113476>, start_time: -576460748085329422, start_metadata: %{parent_telemetry_span_context: #Reference<0.2562483382.3605266433.114612>, remote_address: {127, 0, 0, 1}, remote_port: 34208, telemetry_span_context: #Reference<0.2562483382.3605266436.113476>}}}, %{handler_module: Bandit.HTTP1.Handler, http_1_enabled: true, http_2_enabled: true, opts: %{http_1: [], http_2: [], websocket: []}, plug: {Wsdataselect.Router, []}, requests_processed: 1, websocket_enabled: true}}

Somebody get’s what’s wrong here ?

Thanks !

I’m not sure how your dataselect tool works, but I suspect the problem is that the external process will not exit (or the stream is not flushed) until you close the port, which is why you are getting a timeout. But if you close the port, the beam will close the stream from the process, potentially truncating the output. This is a limitation with the beam and requires external tools to work around.

(I’m outside on phone now, sure somebody else will write a better reply before I can).

Maybe you can make it work with a port but I’d look into other solutions. Maybe pipe the output of dataselect to a temporary file and read that file with File.stream!, combining it with Enum.reduce_while to send chunks.

Edit: IO.binstream is more appropriate since the data is binary data.

Edit 2: Another option is System.cmd with the into: IO.stream() option.

Thank you for your ideas. I appreciate a lot !

I gave a quick try to System.cmd whith a IO.binstream() but I must confess I was not able to manipulate the Enum.reduce_while/3 along with Plug.Conn.chunk().

Therefore, for now, I wrote my output to a temporary file and transmit to the user with Plug.Conn.send_file/5

As a matter of fact, 95% of my requests will generate less than 1MB, so the overhead of writing to a file before sending to the user is bearable.
I am also able to evaluate the result size before running the request, so I return a 429 in case of too much data, not risking to fill the temporary directory.

I must say I realy enjoy coding in elixir :slight_smile:

You’re not matching on the right shape of data. For example to match on incoming data, you need to match on {^port, {:data, data}}.

Also you should check the result of chunk/2. If it returns {:error, reason}, you should just return the conn.

Apologies for any formatting, I’m typing this on my phone.

Edit: see the Port docs for more details

1 Like

Also I noticed you’re using Bandit. In my testing, Bandit throws away the result of the underlying socket send in chunk/2 calls, which means you’ll never get an error if the client terminates before the end of the stream. If you’re capped at 1MB, this might not matter to you.

I’ve reported the bandit issue on GitHub.

Thank you, using your pattern matching, it’s working like a charm !

Thank you also for your warning regarding bandit. I’ll see if it’s a problem.

2 Likes

Glad it helped.

The Bandit issue has been fixed.