Waiting in Stream.resource for new elements to stream?

I want to stream the contents of a queue so I can use Flow to process the data. Data is added to the queue asynchronously at undefined times.
The Stream.resource looks kind of like this:

    Stream.resource(fn -> nil end, fn (_) ->
      case Queue.out() do
        {:value, value} -> {[value], nil}
        :empty -> {[], nil}
      end
    end, fn (_) -> nil end)

So my question is: Is it wise or even possible to block in Stream.resource when the queue is empty? Because in the shown code above it would basically be polling for the next element in the queue.
I thought of creating some kind of check for a non empty queue which sleeps for a while when it’s empty before checking again.

How does the queue receive new items? The general way to wait for messages/events in Elixir is to use a receive expression. receive blocks until the next message is received but does so in a performant way as this is a core part of the runtime.

Edit: Maybe look at GenStage.stream.

1 Like

I created a GenServer which wraps around the Erlang :queue. Now I found BlockingQueue which is exactly what I need.
I will look into GenStage.stream and see if that brings any benefits.
Thanks for the hints!