Can I stream receive from the network?

Hey everyone,

If I want to stream data from a file, it’s mega easy:

File.stream!("/path/to/file")
|> Stream.map(...)
|> Stream.filter(...)
|> ...consume the processed data here...
|> Stream.run

Is there a way to do the same with a network stream? If there is, I have no idea how to replace File.stream! with it. I dabbled in :gen_tcp but it does not seem to be compatible with Elixir’s Stream module (or IO for that matter).

What I am looking for is a data-origin-neutral way to stream receive data. In the above example that means I only want to be swapping out the top line and everything else must stay the same. It this possible?

Thank you.

I’m not aware of anything that already exists for that. Which implies in no way that there isn’t.

It’s not clear to me how network errors should be handled in a stream.

That said, I could imagine something like:

defmodule TCP do
  def stream!(opts) do
    Stream.resource(
      fn ->
        {:ok, pid} = TCP.DynamicSupervisor.start_child(opts)
        GenServer.call(pid, :open)
        pid
      end,
      fn pid ->
        case GenServer.call(pid, :next) do
          {:ok, data} -> {[data], pid}
          _ -> {:halt, pid}
      end,
      fn pid ->
        Genserver.call(pid, :close)
      end
  end
end

The dynamic supervisor would start up a GenServer that responded to those calls and did something appropriate with them.

NOTE: Assume there are probably errors

1 Like

It seems it is me who has to create a GenServer proxy which handles the same messages while allowing different kinds of streams encapsulated in it. :frowning: Or a struct plus protocol.

As much as we bash it sometimes here, Go’s uniform I/O is extremely useful.

I’m not entirely sure that your networked use case is a good fit for

composable, lazy enumerables

Stream - Elixir

I could be off base but I view Elixir Streams as a largely sequential programming construct to feature laziness.

When is it comes to networked, i.e. distributed communication, concurrent programming can actually make certain things simpler - processes are supposed to be used to implement protocols and processes aren’t supposed to be a big deal when they are appropriate.

1 Like

In this case, I would like to feed data to a function that requires a normal Elixir Stream (basically an object that is an Enumerable and a Collectable, namely IO.Stream). It’s quite easy to figure out how to do that with files and even wrap a stream around a string but I cannot find a way to feed it a network stream (random example: the VLC player fire-hosing a movie on the local network) to Elixir’s Stream functions like map and filter. I am looking for a way to have an IO.Stream which reads data from network and can write data to network (bi-directional).

In Go, I can just have a Reader and Writer instances which internally can be based off of anything – files, buffers, network sockets, you name it. In Elixir, I cannot find a way to do it with connected sockets.

Simply put, I am writing a tool that accepts a stream, filters / transforms it, and outputs another stream. It can work with streams based on files and string buffers but not based on connected sockets. So I guess I will end up just doing something like this:

curl http://somewhere.com/source.data | my_tool >transformed.data

…and just pass the :stdio stream to my function.

IO.binstream/2 seems to accept a pid which suggests that there is some kind of protocol a process can follow to act as the source of a stream (keeping in mind that data doesn’t push though streams).

The problem is exactly that, I cannot get a PID for a connected network socket. See below.

For a quick test (this brings up a small HTTP server):

cd /tmp
echo '.hello{}' > 1.css
erl -s inets -eval 'inets:start(httpd,[{server_name,"NAME"},{document_root, "."},{server_root, "."},{port, 4000},{mime_types,[{"html","text/html"},{"htm","text/html"},{"js","text/javascript"},{"css","text/css"},{"gif","image/gif"},{"jpg","image/jpeg"},{"jpeg","image/jpeg"},{"png","image/png"}]}]).'

I tried getting the PID contained inside the Port that a :gen_tcp returns to you and duplicate a request that curl executes successfully:

{:ok, port} = :gen_tcp.connect('localhost', 4000, [])
pid = Port.info(port) |> Keyword.get(:connected)
IO.binwrite(pid, "GET /1.css HTTP/1.1\nHost: localhost:4000\nUser-Agent: curl/7.61.1\nAccept: */*\n\n")

…and that hangs.

At this point I accepted that I severely misunderstand something so I came here looking for help.

You’ve already been pointed to resource/3 which seems to be used here.

The other option seems to be going low level, i.e.: The Erlang I/O Protocol

I cannot understand the relevance of the article after I’ve read it. If you are telling me I have to devise my own means to somehow model generic input/output stream (that can also use a connected socket underneath, not only files or string IO objects) then yes, I am kind of gathering that myself while scanning Elixir’s stdlib. I’ll see if I can find anything more.

Hmm, maybe. Scanning through it quickly, it doesn’t seem to directly address my problem.

Not sure if that’s what you’re looking for but here are my two cents: hackney can stream using async option:

you could then use GenStage to do the processing, here’s somewhat related comment Close an async request manually · Issue #103 · edgurgel/httpoison · GitHub

@yurko Does hackney give you a PID that is the connected network socket?

I it gives you a unique reference, you see it if you do :hackney.get('http://example.org/stream', [], <<>>, [:async]) in iex

My guess is that device pidfor IO.stream/2 or IO.binstream/2 is an IO server (process).

So I suspect that stream/binstream are responsible for sending {:io_request, from, reply_as, request} tuples to the specified IO server which is expected to serve {:io_reply, reply_as, reply} responses.

So the IO server would be entirely responsible for managing the network socket, including managing network errors and buffering any available payload that hasn’t been (io_)requested yet.

That being said there could be an easier way …

Also:
HTTP Streaming in Elixir
Gun

2 Likes

Thank you. While I moved on and this is no longer urgently relevant, it remained a point of interest which I will eventually pursue.

I like Go’s Reader and Writer way of doing things and I believe every language has to strive to emulate such concepts.

Curious if you ever made any progress on this problem. I myself have found myself trying to implement a “source agnostic” streaming protocol and seem have to have run into exactly the same issues you have.

I am probably out of my depth here but it seems like a fundamental barrier might be that (most?) http servers operate on a “push” model (or maybe it is part of the protocol itself?) where the client is expected to listen for the next chunk whereas Elixir streams wait for the caller to “pull” the next part? So at minimum you are looking at implementing an intermediate process to handle the listening and then respond to the caller? But then I have to wonder how the GO API you mention works…

I’m curious about the theory here even if practically I end up just having the read the entire response into memory and then stream that :slight_smile:

Yes but that’s not a problem; you can have a wrapper – the one that provides you a generic network stream abstraction – that simply reads ahead (or even reads it all) and then just serves you stuff one bite at a time.

Golang has interfaces (programming by contract) and you can get a Reader or Writer to practically everything and pass it to a function that utilizes either one while being 100% oblivious of the underlying implementation.

With Golang I have basically streamed a file uploaded to my app directly to S3 without ever having to store a single byte on the app server, for example.

The extremely useful rclone tool is written in Golang and uses and abuses the above idea all the time, to a crushing success.


As others have pointed out, something similar-ish can be achieved with Stream.resource; you can wrap stuff with it and provide it as an abstraction. Should be doable but I moved on long time ago and figured I’ll not try to piss against the wind. Truth is, most code does not need to be abstract and oblivious to the underlying implementation unless you’re a writing a library or a generic tool… and both are something that I do very rarely.

…which is what I’m doing so I’ve self-assigned this particular problem

Yes but that’s not a problem; you can have a wrapper

By not a problem, you mean it’s technically possible? I’m trying to minimize deps in my dep so I’m trying to make this work with httpc. It has a stream option but it assumes the caller process will handle receiving each chunk, which is awkward at best. Probably will just read the entire response and stream that then and call it good enough for now

Yes.

You should be able to handle that with Stream.resource though.

We have a process in our app that uses a GenStage producer to stream and parse an HTTP response in a single process. We use the Mint HTTP library in passive socket mode to receive chunks of binaries from the socket, passing them to the Saxy XML parser.

As the comment above says, you should be able to achieve something similar using Stream.resource. I’ve made an example that might help get you going: http-stream.exs · GitHub

3 Likes

Hey, you know what? I’ll mark your answer as a solution, 5 years later. :smiley:

That code definitely gets the job done in this particular scenario and I’ve written a very similar one before.

The only thing I’d change is extract out the 3 functions passed to Stream.resource but it’s readable enough.

1 Like