Can I stream receive from the network?

Hey everyone,

If I want to stream data from a file, it’s mega easy:

File.stream!("/path/to/file")
|> Stream.map(...)
|> Stream.filter(...)
|> ...consume the processed data here...
|> Stream.run

Is there a way to do the same with a network stream? If there is, I have no idea how to replace File.stream! with it. I dabbled in :gen_tcp but it does not seem to be compatible with Elixir’s Stream module (or IO for that matter).

What I am looking for is a data-origin-neutral way to stream receive data. In the above example that means I only want to be swapping out the top line and everything else must stay the same. It this possible?

Thank you.

I’m not aware of anything that already exists for that. Which implies in no way that there isn’t.

It’s not clear to me how network errors should be handled in a stream.

That said, I could imagine something like:

defmodule TCP do
  def stream!(opts) do
    Stream.resource(
      fn ->
        {:ok, pid} = TCP.DynamicSupervisor.start_child(opts)
        GenServer.call(pid, :open)
        pid
      end,
      fn pid ->
        case GenServer.call(pid, :next) do
          {:ok, data} -> {[data], pid}
          _ -> {:halt, pid}
      end,
      fn pid ->
        Genserver.call(pid, :close)
      end
  end
end

The dynamic supervisor would start up a GenServer that responded to those calls and did something appropriate with them.

NOTE: Assume there are probably errors

1 Like

It seems it is me who has to create a GenServer proxy which handles the same messages while allowing different kinds of streams encapsulated in it. :frowning: Or a struct plus protocol.

As much as we bash it sometimes here, Go’s uniform I/O is extremely useful.

I’m not entirely sure that your networked use case is a good fit for

composable, lazy enumerables

Stream - Elixir

I could be off base but I view Elixir Streams as a largely sequential programming construct to feature laziness.

When is it comes to networked, i.e. distributed communication, concurrent programming can actually make certain things simpler - processes are supposed to be used to implement protocols and processes aren’t supposed to be a big deal when they are appropriate.

1 Like

In this case, I would like to feed data to a function that requires a normal Elixir Stream (basically an object that is an Enumerable and a Collectable, namely IO.Stream). It’s quite easy to figure out how to do that with files and even wrap a stream around a string but I cannot find a way to feed it a network stream (random example: the VLC player fire-hosing a movie on the local network) to Elixir’s Stream functions like map and filter. I am looking for a way to have an IO.Stream which reads data from network and can write data to network (bi-directional).

In Go, I can just have a Reader and Writer instances which internally can be based off of anything – files, buffers, network sockets, you name it. In Elixir, I cannot find a way to do it with connected sockets.

Simply put, I am writing a tool that accepts a stream, filters / transforms it, and outputs another stream. It can work with streams based on files and string buffers but not based on connected sockets. So I guess I will end up just doing something like this:

curl http://somewhere.com/source.data | my_tool >transformed.data

…and just pass the :stdio stream to my function.

IO.binstream/2 seems to accept a pid which suggests that there is some kind of protocol a process can follow to act as the source of a stream (keeping in mind that data doesn’t push though streams).

The problem is exactly that, I cannot get a PID for a connected network socket. See below.

For a quick test (this brings up a small HTTP server):

cd /tmp
echo '.hello{}' > 1.css
erl -s inets -eval 'inets:start(httpd,[{server_name,"NAME"},{document_root, "."},{server_root, "."},{port, 4000},{mime_types,[{"html","text/html"},{"htm","text/html"},{"js","text/javascript"},{"css","text/css"},{"gif","image/gif"},{"jpg","image/jpeg"},{"jpeg","image/jpeg"},{"png","image/png"}]}]).'

I tried getting the PID contained inside the Port that a :gen_tcp returns to you and duplicate a request that curl executes successfully:

{:ok, port} = :gen_tcp.connect('localhost', 4000, [])
pid = Port.info(port) |> Keyword.get(:connected)
IO.binwrite(pid, "GET /1.css HTTP/1.1\nHost: localhost:4000\nUser-Agent: curl/7.61.1\nAccept: */*\n\n")

…and that hangs.

At this point I accepted that I severely misunderstand something so I came here looking for help.

You’ve already been pointed to resource/3 which seems to be used here.

The other option seems to be going low level, i.e.: The Erlang I/O Protocol

I cannot understand the relevance of the article after I’ve read it. If you are telling me I have to devise my own means to somehow model generic input/output stream (that can also use a connected socket underneath, not only files or string IO objects) then yes, I am kind of gathering that myself while scanning Elixir’s stdlib. I’ll see if I can find anything more.

Hmm, maybe. Scanning through it quickly, it doesn’t seem to directly address my problem.

Not sure if that’s what you’re looking for but here are my two cents: hackney can stream using async option:

you could then use GenStage to do the processing, here’s somewhat related comment https://github.com/edgurgel/httpoison/issues/103

@yurko Does hackney give you a PID that is the connected network socket?

I it gives you a unique reference, you see it if you do :hackney.get('http://example.org/stream', [], <<>>, [:async]) in iex

My guess is that device pidfor IO.stream/2 or IO.binstream/2 is an IO server (process).

So I suspect that stream/binstream are responsible for sending {:io_request, from, reply_as, request} tuples to the specified IO server which is expected to serve {:io_reply, reply_as, reply} responses.

So the IO server would be entirely responsible for managing the network socket, including managing network errors and buffering any available payload that hasn’t been (io_)requested yet.

That being said there could be an easier way …

Also:
HTTP Streaming in Elixir
Gun

2 Likes

Thank you. While I moved on and this is no longer urgently relevant, it remained a point of interest which I will eventually pursue.

I like Go’s Reader and Writer way of doing things and I believe every language has to strive to emulate such concepts.