Flow, GenStage, help me piece it all together

If I understand correctly:

  1. I can use a GenStage as a :producer
  2. I can add events to the GenStage by something as simple as a handle_cast that appends whatever we pass it to some list in the state
  3. The key function I need implemented in the GenStage producer is the handle_demand callback which will return both the events to dispatch and the events left in the state
  4. I can create a Flow that will draw from GenStage, and I can use a simple periodic window to partition the events and process them at regular intervals

If this is true, then I think this should work:

defmodule MyProducer do
  use GenStage

  def start_link do
    GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
  end

  def init(:ok) do
    {:producer, []}
  end

  def handle_demand(demand, state) do
    {to_dispatch, remaining} = Enum.split(state, demand)
    {:noreply, to_dispatch, remaining}
  end

  def handle_cast({:add_events, events}, state) do
    {:noreply, [], state ++ events}
  end

end


defmodule MyFlow do
  def start_link do
    window = Flow.Window.periodic(1, :minute)
    x =
      Flow.from_stage(MyProducer, window: window)
      |> Flow.map(fn(x) -> x * 2 end)
      |> Flow.emit(:events)
      |> IO.inspect
    Flow.start_link(x)
  end

end

MyProducer.start_link
MyFlow.start_link
GenServer.cast(MyProducer, {:add_events, Enum.to_list(1..100000)})

I would expect this to consume from the producer at some rate, with one minute windows, and … Printing to the console a list every minute or so.

But no. Nothing. What am I doing wrong?