GenStage: How to cancel a Flow from the producer?

I am creating a finite GenStage producer that reads the body of a hackney http request. Ideally, I’d like to hook this Producer up to a Flow using Flow.from_stage/1. I’ve been able to create a basic producer that reads the request, but don’t know how to shut down the flow after the body has been read. Is this possible?

A little more background

I’m trying to write a streaming file transfer application that gets a url (via hackney) and streams the response to an AWS bucket. I send off the http request and then read the request status and headers. If I get a good status code, I start streaming the body to AWS via Flow. Otherwise, I close the connection and return immediately.

I’ve tried using Stream.resource/3 to create a stream that reads the body of the response, and pass that to the Flow using Flow.from_enumerable/1. However, it turns out Flow.from_enumerable/1 reads the stream in a new process, and you can’t read a hackney response from a process that doesn’t own the connection.

So my thought was to create my own Producer that I can transfer ownership of the hackney connection to, and have it send the results into the flow using Flow.from_stage/1. However, when I reach the end of the body, how do I shutdown the entire flow like is done when you use Flow.from_enumerable/1 on a finite collection?

1 Like

Excellent question! You can send a notification from the producer to let the consumers know the data is over. Something like:

GenStage.async_notify(self(), {:producer, :done})

Notifications are really nice because they are always delivered to all consumers, regardless of your dispatcher strategy, and keeping the same ordering as the events dispatched. Here is one example of us using it in GenStage itself:

https://github.com/elixir-lang/gen_stage/blob/master/lib/gen_stage/streamer.ex#L46

I believe we should document it. Could you please send a pull request or open up an issue so we don’t forget to make this clearer? Thank you!

2 Likes

Thank you so much for your help! That works great! I just submitted an issue for documentation here: https://github.com/elixir-lang/gen_stage/issues/85.