Manually controlled Stream

I want to create a stream that is manually controller from another process e.g a GenServer. How would I go about that? There is a snippet of what I would like to achieve:

{:ok, stream} = ManualStream.new

spawn_link fn ->
  for i <- 1..10000000 do
    Process.sleep 1000
    ManualStream.push(stream, i * i)
  end
  ManualStream.close(stream)
end

stream
|> Stream.take(5)
|> Enum.each(&IO.inspect/1)

Thanks a lot!

Can you give us more information on which problem you are trying to solve? Which data do you want to send between process? What is the purpose? The answer may vary depending on the use case. I could suggest GenStage, a custom Stream or even something else.

1 Like

@josevalim

I am actually trying to create this custom stream that in the code I call ManualStream to serve as a “source” for a GenStage producer. I see a bunch of example where the producer reads from a File.stream, I want to push events from a Phoenix Controller. I posted the question more generic because I thought it was a nice problem on its own to better understand how Streams work, but then if you can somehow push events to a GenStage producer directly all the better.

You can’t run a stream that receives messages from another process inside a GenStage. In this case then, the best option, is to implement it directly as a GenStage producer.

To generalize the previous sentence a bit more: streams that work based on the mailbox are in general discouraged because streams are meant to be purely functional and accessing the mailbox does not compose well with processes in general (for example, you can’t run it inside a GenServer). That’s exactly why GenStage is being researched to be part of the language in the future.

3 Likes