Through_stages hangs unbounded flow

Hi

I’ve been playing around with GenStage and Flow and I am trying to understand why the flow below hangs, instead of returning the list of produced elements

defmodule A do
    use GenStage

    def start_link(number) do
      GenStage.start_link(A, number)
    end

    def init(counter) do
      {:producer, counter}
    end

    def handle_demand(demand, counter) when demand > 0 do
      events = Enum.to_list(counter..counter+demand-1)
      Process.send_after self(), :exit, 1000 # Simulate that the producer is done producing work
      {:noreply, events, counter + demand}
    end

    def handle_info(:exit, state) do
      {:stop, :normal, state}
    end
  end

  defmodule B do
    use GenStage

    def start_link(number) do
      GenStage.start_link(B, number)
    end

    def init(number) do
      {:producer_consumer, number}
    end

    def handle_events(events, _from, number) do
      {:noreply, events, number} # Just forward events
    end
  end

The following code will hang

producers = [{A, 0}]
producer_consumers = [{{B, 2}, []}]

numbers =
  producers
  |> Flow.from_specs
  |> Flow.through_specs(producer_consumers) # Hangs here
  |> Enum.to_list

I would expect that when the the producer exits, all the producer_consumers and consumers
finish processing their in-flight events, terminate, after which the events gets forwarded to the Enum module

How would I go about accomplishing the above?

Please see the termination section here: https://hexdocs.pm/flow/Flow.html#through_stages/3-termination

I read that, but I understood it to only be relevant for bounded flows, in this case I’m working against an unbounded one

It is bound because the producer is terminating. :slight_smile: