Hi
I’ve been playing around with GenStage and Flow and I am trying to understand why the flow below hangs, instead of returning the list of produced elements
defmodule A do
use GenStage
def start_link(number) do
GenStage.start_link(A, number)
end
def init(counter) do
{:producer, counter}
end
def handle_demand(demand, counter) when demand > 0 do
events = Enum.to_list(counter..counter+demand-1)
Process.send_after self(), :exit, 1000 # Simulate that the producer is done producing work
{:noreply, events, counter + demand}
end
def handle_info(:exit, state) do
{:stop, :normal, state}
end
end
defmodule B do
use GenStage
def start_link(number) do
GenStage.start_link(B, number)
end
def init(number) do
{:producer_consumer, number}
end
def handle_events(events, _from, number) do
{:noreply, events, number} # Just forward events
end
end
The following code will hang
producers = [{A, 0}]
producer_consumers = [{{B, 2}, []}]
numbers =
producers
|> Flow.from_specs
|> Flow.through_specs(producer_consumers) # Hangs here
|> Enum.to_list
I would expect that when the the producer exits, all the producer_consumers and consumers
finish processing their in-flight events, terminate, after which the events gets forwarded to the Enum module
How would I go about accomplishing the above?