leifg

leifg

Convert receive loop into Stream

I am currently trying to download a large file via HTTP streaming. That’s hard enough as the only examples I find usually involve a receive loop of some kind of receive loop. See the following example taken from this tutorial:

  defp async_response(conn, id) do
    :ok = :ibrowse.stream_next(id)

    receive do
      {:ibrowse_async_headers, ^id, '200', _headers} ->
        conn = Plug.Conn.send_chunked(conn, 200)
        # Here you might want to set proper headers to `conn`
        # based on `headers` from a response.

        async_response(conn, id)
      {:ibrowse_async_headers, ^id, status_code, _headers} ->
        {status_code_int, _} = :string.to_integer(status_code)
        # If a service responded with an error, we still need to send
        # this error to a client. Again, you might want to set
        # proper headers based on response.

        conn = Plug.Conn.send_chunked(conn, status_code_int)

        async_response(conn, id)
      {:ibrowse_async_response_timeout, ^id} ->
        Plug.Conn.put_status(conn, 408)
      {:error, :connection_closed_no_retry} ->
        Plug.Conn.put_status(conn, 502)
      {:ibrowse_async_response, ^id, data} ->
        case Plug.Conn.chunk(conn, chunk) do
          {:ok, conn} ->
            async_response(conn, id)
          {:error, :closed} ->
            Logger.info "Client closed connection before receiving the last chunk"
            conn
          {:error, reason} ->
            Logger.info "Unexpected error, reason: #{inspect(reason)}"
            conn
        end
      {:ibrowse_async_response_end, ^id} ->
        conn
    end
  end

Ideally I process the individual chunks further and use existing libraries for this.

So having a stream would help greatly.

So how do I convert the receive loop pattern into a stream. Or even better: is there an HTTP client which directly returns a stream after making a request?

Marked As Solved

amarraja

amarraja

I was going to try and do this, but my approach was wrong so I did a little digging and found this article:

Basically they create a process under a supervisor which uses the HTTPoison stream functionality, then setup stream resource to request the next chunks. There is a linked Github repo which looks fairly generic so you could just lift-and-shift

https://github.com/esl/flex

Also Liked

peerreynders

peerreynders

The term stream is underspecified. The solution you are describing buffers data being pushed by HTTP onto the process which is captured via a receive loop so that it can wait in the buffer for a consumer to pull the data out of the buffer. The point being there are

  • push streams and
  • pull streams

The stream module furnishes pull streams: Streams are composable, lazy enumerables - i.e. a consumer drains data that is ready and waiting.

In functional programming available data is typically pushed by calling a function to process it (which is what the receive loop does) or in process oriented programming sent (forwarded) to the process that needs to process is (see GenStage and Flow) - so push streams are much less common.

peerreynders

peerreynders

That isn’t entirely accurate. The consumer simply communicates that it is ready to have a certain amount data pushed to it - it doesn’t actively pull data.

So the difference is:

  • In a pull scenario the client can be blocked if no data is available - i.e. it isn’t going to get anything else done. This is more important for a GenServer which is a “client” than and for example a GenStage consumer (which has one extremely narrowly defined responsibility anyway).
  • In an unconstrained push scenario a client process can be overwhelmed with data being pushed to it, which usually leads to the process mailbox growing which will have undesirable side effects.
  • This is why GenStage uses “push with backpressure”, i.e. consumers are never blocked nor are they overwhelmed as the producer is supposed to limit the flow through the GenStage pipeline.

So in my mind “pulling” always expresses the “risk” of being blocked.

It aways helps to know when process execution will be blocked given that code is strictly sequential inside any single process. There is nothing inherently “bad” about a process being blocked because its capability could be highly focused and there could be absolutely nothing else for it to do when it is blocked. But in certain capacities some GenServer based processes cannot afford to be blocked to function well, so that they are forced to “out source” any “blocking activities” to other secondary processes (GenServer docs: "handle_cast ... should be used sparingly" Why?).

In environments (without lightweight processes) that rely on heavyweight threads push based streams like ReactiveX are much more common.

Inside a process, data is pushed by calling a function, between processes data is pushed by casting a message from producer to consumer (call being used to acknowledge receipt of data by the consumer).

amarraja

amarraja

Thanks @peerreynders, that got me thinking. Unfortunately I read your reply a few times and I think I confused myself!

So how would the original problem look as a push stream?

I’m making the assumption that when a request is made to download a large file, the client just receives data as it comes over the wire.

Let’s say we used GenStage with a single producer and consumer. The http request would “push” data into the producer’s buffer and the consumer will “pull” when ready. I say “pull”, I know the data gets pushed, but the consumer still needs to indicate it can handle the demand - much the same way a stream will ask or wait for the next item.

Not sure I see the fundemental difference in this case.

Where Next?

Popular in Questions Top

mgjohns61585
Could someone help me? I’m making my first elixir program, number guessing game. I can’t figure out how to convert the user’s guess from ...
New
JorisKok
I have a server on AWS, and was running a load test using artillery. When looking at the Phoenix dashboard I see the Ports going to 100% ...
New
LegitStack
I’m trying to make a websocket server in Phoenix or raw Elixir. I heard about gun, I think I could use cowboy, but since I’m not that sma...
New
Lily
In templates/appointment/index.html.eex: <%= for appointment <- @appointments do %> <tr> <td><%= appoi...
New
pmjoe
I have a relationship of love and hate with Elixir. Lots of things are just absolutely right, but there are some things that are kind of ...
New
stefanluptak
Hello everybody, usually, I use a 29" ultra-wide monitor for VSCode which can easily accomodate explorer (files panel) + file with code ...
New
belgoros
I’m not a pro in using Regex and can’t figure out why the following behaviour happens, especially if we take into account the difference ...
New
PeterCarter
There are pre-rolled solutions for other frameworks that do work. However, Phoenix does not seem to have these. Have people had good expe...
New
jononomo
For some reason my phoenix channels are working for me in my local dev environment, but as soon as I deploy via Docker, I get a 403 error...
New
lanycrost
Hi everyone! I need implement if…else if…else condition from my elixir code, and anymore of this control flow structures not work proper...
New

Other popular topics Top

TunkShif
This post is an instruction guide to help you setup your Neovim for Elixir development from scratch. It includes general information on h...
274 41539 114
New
gshaw
What is the idiomatic way of matching for not nil in Elixir? E.g., First way: defp halt_if_not_signed_in(conn, signed_in_account) when...
New
AngeloChecked
What learn first? Rust or Elixir Hi Elixir community! I’m here because i want learn a new language. I’m a junior developer and mainly i ...
New
vegabook
I’m brand new to Phoenix and I have stripped one of the demo applications to the bone. I just want to get an svg up on the screen. Here i...
New
vrod
I am using the Starship cross-shell prompt – it seems pretty nice, but I get some errors: [WARN] - (starship::utils): Executing command ...
New
fayddelight
I tried installing elixir 1.11.2 erlang 23.3.4 via asdf in my zsh shell. Enabled the versions locally and globally. When I list them ...
New
axelson
This post is a wiki (feel free to hit the edit button near the bottom right of this post to add your own changes!) This post collects co...
239 47930 226
New
marick
I had some trouble figuring out how to make many-to-many associations work. Once I got it working, I wrote a blog post. Because I’m a nov...
New
Qqwy
Update: How to use the Blogs & Podcasts section You can post links to your blog posts or podcasts either in one of the Official Blog...
3271 126479 1222
New
sergio
Kind of like when jquery came out, it was super necessary. Existing drag and drop libraries have a bunch of baggage to support old browse...
New

We're in Beta

About us Mission Statement