Convert receive loop into Stream

I am currently trying to download a large file via HTTP streaming. That’s hard enough as the only examples I find usually involve a receive loop of some kind of receive loop. See the following example taken from this tutorial:

  defp async_response(conn, id) do
    :ok = :ibrowse.stream_next(id)

    receive do
      {:ibrowse_async_headers, ^id, '200', _headers} ->
        conn = Plug.Conn.send_chunked(conn, 200)
        # Here you might want to set proper headers to `conn`
        # based on `headers` from a response.

        async_response(conn, id)
      {:ibrowse_async_headers, ^id, status_code, _headers} ->
        {status_code_int, _} = :string.to_integer(status_code)
        # If a service responded with an error, we still need to send
        # this error to a client. Again, you might want to set
        # proper headers based on response.

        conn = Plug.Conn.send_chunked(conn, status_code_int)

        async_response(conn, id)
      {:ibrowse_async_response_timeout, ^id} ->
        Plug.Conn.put_status(conn, 408)
      {:error, :connection_closed_no_retry} ->
        Plug.Conn.put_status(conn, 502)
      {:ibrowse_async_response, ^id, data} ->
        case Plug.Conn.chunk(conn, chunk) do
          {:ok, conn} ->
            async_response(conn, id)
          {:error, :closed} ->
            Logger.info "Client closed connection before receiving the last chunk"
            conn
          {:error, reason} ->
            Logger.info "Unexpected error, reason: #{inspect(reason)}"
            conn
        end
      {:ibrowse_async_response_end, ^id} ->
        conn
    end
  end

Ideally I process the individual chunks further and use existing libraries for this.

So having a stream would help greatly.

So how do I convert the receive loop pattern into a stream. Or even better: is there an HTTP client which directly returns a stream after making a request?

2 Likes

Have a look at Stream.resource/3. I’m not up on current http client features so don’t know if any include what you’re looking for.

I know about Stream.resource but have no idea how I can reconclie the receive loop with this.

I’m wondering if anyone had a similar problem and has a working solution.

I was going to try and do this, but my approach was wrong so I did a little digging and found this article:

Basically they create a process under a supervisor which uses the HTTPoison stream functionality, then setup stream resource to request the next chunks. There is a linked Github repo which looks fairly generic so you could just lift-and-shift

2 Likes

That looks exactly like what I am looking for, thank you so much!

The term stream is underspecified. The solution you are describing buffers data being pushed by HTTP onto the process which is captured via a receive loop so that it can wait in the buffer for a consumer to pull the data out of the buffer. The point being there are

  • push streams and
  • pull streams

The stream module furnishes pull streams: Streams are composable, lazy enumerables - i.e. a consumer drains data that is ready and waiting.

In functional programming available data is typically pushed by calling a function to process it (which is what the receive loop does) or in process oriented programming sent (forwarded) to the process that needs to process is (see GenStage and Flow) - so push streams are much less common.

3 Likes

Thanks @peerreynders, that got me thinking. Unfortunately I read your reply a few times and I think I confused myself!

So how would the original problem look as a push stream?

I’m making the assumption that when a request is made to download a large file, the client just receives data as it comes over the wire.

Let’s say we used GenStage with a single producer and consumer. The http request would “push” data into the producer’s buffer and the consumer will “pull” when ready. I say “pull”, I know the data gets pushed, but the consumer still needs to indicate it can handle the demand - much the same way a stream will ask or wait for the next item.

Not sure I see the fundemental difference in this case.

1 Like

That isn’t entirely accurate. The consumer simply communicates that it is ready to have a certain amount data pushed to it - it doesn’t actively pull data.

So the difference is:

  • In a pull scenario the client can be blocked if no data is available - i.e. it isn’t going to get anything else done. This is more important for a GenServer which is a “client” than and for example a GenStage consumer (which has one extremely narrowly defined responsibility anyway).
  • In an unconstrained push scenario a client process can be overwhelmed with data being pushed to it, which usually leads to the process mailbox growing which will have undesirable side effects.
  • This is why GenStage uses “push with backpressure”, i.e. consumers are never blocked nor are they overwhelmed as the producer is supposed to limit the flow through the GenStage pipeline.

So in my mind “pulling” always expresses the “risk” of being blocked.

It aways helps to know when process execution will be blocked given that code is strictly sequential inside any single process. There is nothing inherently “bad” about a process being blocked because its capability could be highly focused and there could be absolutely nothing else for it to do when it is blocked. But in certain capacities some GenServer based processes cannot afford to be blocked to function well, so that they are forced to “out source” any “blocking activities” to other secondary processes (GenServer docs: "handle_cast ... should be used sparingly" Why?).

In environments (without lightweight processes) that rely on heavyweight threads push based streams like ReactiveX are much more common.

Inside a process, data is pushed by calling a function, between processes data is pushed by casting a message from producer to consumer (call being used to acknowledge receipt of data by the consumer).

3 Likes